Dns discovery custom resolver (#25937)

* Import service discovery from akka-management

* Rename extension to Discovery to go with akka-discovery name
* Rename interafce to ServiceDisovery
* Import config, aggregate and dns
* Discovery documentation
* Load isolated async-dns if not configured as default
* OSGi for discovery
* Remove warning for not using in production
* Fail if old akka management on classpath
* Only allow async dns to be loaded as an additional resolver
* Use method in all of service discovery, not mechanism
* Mima filter
* Add discovery to aggregate
* Set discovery mima versions
* DnsDiscoverySpec: Only run docker test if docker available
This commit is contained in:
Christopher Batey 2018-12-05 13:01:24 +00:00 committed by Patrik Nordwall
parent bca269d684
commit c5c2f951db
26 changed files with 1642 additions and 32 deletions

View file

@ -72,8 +72,8 @@ many in AAAA 2001:985:965:1:ba27:ebff:fe5f:9d2d
many in AAAA 2001:985:965:1:ba27:ebff:fe5f:9d2e
many in AAAA 2001:985:965:1:ba27:ebff:fe5f:9d2f
service.tcp 86400 IN SRV 10 65534 5060 a-single
service.tcp 86400 IN SRV 65533 40 65535 a-double
_service._tcp 86400 IN SRV 10 65534 5060 a-single
_service._tcp 86400 IN SRV 65533 40 65535 a-double
cname-in IN CNAME a-double
cname-ext IN CNAME a-single.bar.example.

View file

@ -125,13 +125,13 @@ class AsyncDnsResolverIntegrationSpec extends AkkaSpec(
}
"resolve SRV record" in {
val name = "service.tcp.foo.test"
val answer = resolve("service.tcp.foo.test", Srv)
val name = "_service._tcp.foo.test"
val answer = resolve(name, Srv)
answer.name shouldEqual name
answer.records.collect { case r: SRVRecord r }.toSet shouldEqual Set(
SRVRecord("service.tcp.foo.test", Ttl.fromPositive(86400.seconds), 10, 65534, 5060, "a-single.foo.test"),
SRVRecord("service.tcp.foo.test", Ttl.fromPositive(86400.seconds), 65533, 40, 65535, "a-double.foo.test")
SRVRecord("_service._tcp.foo.test", Ttl.fromPositive(86400.seconds), 10, 65534, 5060, "a-single.foo.test"),
SRVRecord("_service._tcp.foo.test", Ttl.fromPositive(86400.seconds), 65533, 40, 65535, "a-double.foo.test")
)
}

View file

@ -1,3 +1,6 @@
# Dns discovery custom resolver #25937 - Internal API
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.io.dns.internal.AsyncDnsManager.ext")
# Replaces DNS TTL primitive types with Duration #25850
ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.io.SimpleDnsCache.put")
ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.io.SimpleDnsCache#Cache.put")
@ -54,4 +57,3 @@ ProblemFilters.exclude[DirectMissingMethodProblem]("akka.io.TcpConnection.abort"
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.io.ChannelRegistration.cancel")
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.io.ChannelRegistration.cancelAndClose")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.io.SelectionHandler#ChannelRegistryImpl.this")

View file

@ -5,11 +5,13 @@
package akka.io
import java.net.{ Inet4Address, Inet6Address, InetAddress, UnknownHostException }
import java.util.concurrent.ConcurrentHashMap
import akka.actor._
import akka.annotation.InternalApi
import akka.routing.ConsistentHashingRouter.ConsistentHashable
import com.typesafe.config.Config
import java.util.function.{ Function JFunction }
import scala.collection.{ breakOut, immutable }
abstract class Dns {
@ -87,32 +89,77 @@ object Dns extends ExtensionId[DnsExt] with ExtensionIdProvider {
override def get(system: ActorSystem): DnsExt = super.get(system)
}
class DnsExt(val system: ExtendedActorSystem) extends IO.Extension {
class DnsExt private[akka] (val system: ExtendedActorSystem, resolverName: String, managerName: String) extends IO.Extension {
val Settings = new Settings(system.settings.config.getConfig("akka.io.dns"))
private val asyncDns = new ConcurrentHashMap[String, ActorRef]
class Settings private[DnsExt] (_config: Config) {
/**
* INTERNAL API
*
* Load an additional async-dns resolver. Can be used to use async-dns even if inet-resolver is the configuerd
* default.
* Intentionally chosen not to support loading an arbitrary resolver as it required a specific constructor
* for the manager actor. The expected constructor for DNS plugins is just to take in a DnsExt which can't
* be used in this case
*/
@InternalApi
private[akka] def loadAsyncDns(managerName: String): ActorRef = {
// This can't pass in `this` as then AsyncDns would pick up the system settings
asyncDns.computeIfAbsent(managerName, new JFunction[String, ActorRef] {
override def apply(r: String): ActorRef = {
val settings = new Settings(system.settings.config.getConfig("akka.io.dns"), "async-dns")
val provider = system.dynamicAccess.getClassFor[DnsProvider](settings.ProviderObjectName).get.newInstance()
system.log.info("Creating async dns resolver {} with manager name {}", settings.Resolver, managerName)
system.systemActorOf(
props = Props(provider.managerClass, settings.Resolver, system, settings.ResolverConfig, provider.cache, settings.Dispatcher, provider).withDeploy(Deploy.local).withDispatcher(settings.Dispatcher),
name = managerName)
}
})
}
import _config._
/**
* INTERNAL API
*
* Use IO(DNS) or Dns(system). Do not instantiate directly
*
* For binary compat as DnsExt constructor didn't used to have internal API on
*/
@InternalApi
def this(system: ExtendedActorSystem) = this(system, system.settings.config.getString("akka.io.dns.resolver"), "IO-DNS")
val Dispatcher: String = getString("dispatcher")
val Resolver: String = getString("resolver")
val ResolverConfig: Config = getConfig(Resolver)
class Settings private[DnsExt] (config: Config, resolverName: String) {
/**
* Load the default resolver
*/
def this(config: Config) = this(config, config.getString("resolver"))
val Dispatcher: String = config.getString("dispatcher")
val Resolver: String = resolverName
val ResolverConfig: Config = config.getConfig(Resolver)
val ProviderObjectName: String = ResolverConfig.getString("provider-object")
override def toString = s"Settings($Dispatcher, $Resolver, $ResolverConfig, $ProviderObjectName)"
}
// Settings for the system resolver
val Settings: Settings = new Settings(system.settings.config.getConfig("akka.io.dns"), resolverName)
// System DNS resolver
val provider: DnsProvider = system.dynamicAccess.getClassFor[DnsProvider](Settings.ProviderObjectName).get.newInstance()
// System DNS cache
val cache: Dns = provider.cache
// System DNS manager
val manager: ActorRef = {
system.systemActorOf(
props = Props(provider.managerClass, this).withDeploy(Deploy.local).withDispatcher(Settings.Dispatcher),
name = "IO-DNS")
name = managerName)
}
// System DNS manager
def getResolver: ActorRef = manager
}
object IpVersionSelector {

View file

@ -7,14 +7,15 @@ package akka.io.dns.internal
import java.net.InetSocketAddress
import java.util.concurrent.TimeUnit
import akka.actor.{ Actor, ActorLogging, ActorRefFactory, Deploy, Props, Timers }
import akka.actor.{ Actor, ActorLogging, ActorRefFactory, Deploy, ExtendedActorSystem, Props, Timers }
import akka.annotation.InternalApi
import akka.dispatch.{ RequiresMessageQueue, UnboundedMessageQueueSemantics }
import akka.io.dns.{ AAAARecord, ARecord, DnsProtocol, DnsSettings }
import akka.io.dns.internal.AsyncDnsManager.CacheCleanup
import akka.io.{ Dns, DnsExt, PeriodicCacheCleanup }
import akka.io.{ Dns, DnsExt, DnsProvider, PeriodicCacheCleanup }
import akka.routing.FromConfig
import akka.util.Timeout
import com.typesafe.config.Config
import scala.concurrent.duration.Duration
@ -30,33 +31,36 @@ private[io] object AsyncDnsManager {
* INTERNAL API
*/
@InternalApi
private[io] final class AsyncDnsManager(val ext: DnsExt) extends Actor
private[io] final class AsyncDnsManager(name: String, system: ExtendedActorSystem, resolverConfig: Config, cache: Dns, dispatcher: String, provider: DnsProvider) extends Actor
with RequiresMessageQueue[UnboundedMessageQueueSemantics] with ActorLogging with Timers {
import akka.pattern.ask
import akka.pattern.pipe
/**
* Ctr expected by the DnsExt for all DnsMangers
*/
def this(ext: DnsExt) = this(ext.Settings.Resolver, ext.system, ext.Settings.ResolverConfig, ext.cache, ext.Settings.Dispatcher, ext.provider)
implicit val ec = context.dispatcher
private var oldProtocolWarningLoggedTimes = 0
val settings = new DnsSettings(ext.system, ext.Settings.ResolverConfig)
val settings = new DnsSettings(system, resolverConfig)
implicit val timeout = Timeout(settings.ResolveTimeout)
private val resolver = {
val props: Props = FromConfig.props(Props(ext.provider.actorClass, settings, ext.cache, (factory: ActorRefFactory, dns: List[InetSocketAddress]) {
val props: Props = FromConfig.props(Props(provider.actorClass, settings, cache, (factory: ActorRefFactory, dns: List[InetSocketAddress]) {
dns.map(ns factory.actorOf(Props(new DnsClient(ns))))
}).withDeploy(Deploy.local).withDispatcher(ext.Settings.Dispatcher))
context.actorOf(props, ext.Settings.Resolver)
}).withDeploy(Deploy.local).withDispatcher(dispatcher))
context.actorOf(props, name)
}
private val cacheCleanup = ext.cache match {
private val cacheCleanup = cache match {
case cleanup: PeriodicCacheCleanup Some(cleanup)
case _ None
}
override def preStart(): Unit = {
cacheCleanup.foreach { _
val interval = Duration(ext.Settings.ResolverConfig.getDuration("cache-cleanup-interval", TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS)
val interval = Duration(resolverConfig.getDuration("cache-cleanup-interval", TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS)
timers.startPeriodicTimer(CacheCleanup, CacheCleanup, interval)
}
}

View file

@ -7,4 +7,4 @@ You can run them like:
project akka-bench-jmh
jmh:run -i 3 -wi 3 -f 1 .*ActorCreationBenchmark
Use 'jmh:run -h' to get an overview of the availabe options.
Use 'jmh:run -h' to get an overview of the available options.

View file

@ -0,0 +1,75 @@
######################################################
# Akka Discovery Config #
######################################################
akka.actor.deployment {
"/SD-DNS/async-dns" {
mailbox = "unbounded"
router = "round-robin-pool"
nr-of-instances = 1
}
}
akka.discovery {
# Users MUST configure this value to set the default discovery method.
#
# The value can be an implementation config path name, such as "akka-dns",
# which would attempt to resolve as `akka.discovery.akka-dns` which is expected
# to contain a `class` setting. As fallback, the root `akka-dns` setting scope
# would be used. If none of those contained a `class` setting, then the value is
# assumed to be a class name, and an attempt is made to instantiate it.
method = "<method>"
# Config based service discovery
config {
class = akka.discovery.config.ConfigServiceDiscovery
# Location of the services in configuration
services-path = "akka.discovery.config.services"
# A map of services to resolve from configuration.
# See docs for more examples.
# A list of endpoints with host/port where port is optional e.g.
# services {
# service1 {
# endpoints = [
# {
# host = "cat.com"
# port = 1233
# },
# {
# host = "dog.com"
# }
# ]
# },
# service2 {
# endpoints = [
# {
# host = "fish.com"
# port = 1233
# }
# ]
# }
# }
services = {
}
}
# Aggregate multiple service discovery mechaisms
aggregate {
class = akka.discovery.aggregate.AggregateServiceDiscovery
# List of service discovery methods to try in order. E.g DNS then fall back to config
# ["akka-dns" , "confg" ]
discovery-methods = []
}
# DNS based service discovery
akka-dns {
class = akka.discovery.dns.DnsServiceDiscovery
}
}

View file

@ -0,0 +1,119 @@
/*
* Copyright (C) 2017-2018 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.discovery
import java.util.concurrent.ConcurrentHashMap
import java.util.function.{ Function JFunction }
import akka.actor._
import akka.annotation.InternalApi
import scala.util.{ Failure, Success, Try }
final class Discovery(implicit system: ExtendedActorSystem) extends Extension {
Discovery.checkClassPathForOldDiscovery(system)
private val implementations = new ConcurrentHashMap[String, ServiceDiscovery]
private val factory = new JFunction[String, ServiceDiscovery] {
override def apply(method: String): ServiceDiscovery = createServiceDiscovery(method)
}
private lazy val _defaultImplMethod =
system.settings.config.getString("akka.discovery.method") match {
case "<method>"
throw new IllegalArgumentException(
"No default service discovery implementation configured in " +
"`akka.discovery.method`. Make sure to configure this setting to your preferred implementation such as " +
"'akka-dns' in your application.conf (from the akka-discovery module).")
case method method
}
private lazy val defaultImpl = loadServiceDiscovery(_defaultImplMethod)
/**
* Default [[ServiceDiscovery]] as configured in `akka.discovery.method`.
*/
@throws[IllegalArgumentException]
def discovery: ServiceDiscovery = defaultImpl
/**
* Create a [[ServiceDiscovery]] from configuration property.
* The given `method` parameter is used to find configuration property
* "akka.discovery.[method].class" or "[method].class". `method` can also
* be a fully class name.
*
* The `ServiceDiscovery` instance for a given `method` will be created
* once and subsequent requests for the same `method` will return the same instance.
*/
def loadServiceDiscovery(method: String): ServiceDiscovery = {
implementations.computeIfAbsent(method, factory)
}
/**
* INTERNAL API
*/
@InternalApi
private def createServiceDiscovery(method: String): ServiceDiscovery = {
val config = system.settings.config
val dynamic = system.dynamicAccess
def classNameFromConfig(path: String): String =
if (config.hasPath(path)) config.getString(path)
else throw new IllegalArgumentException(s"$path must contain field `class` that is a FQN of a `akka.discovery.ServiceDiscovery` implementation")
def create(clazzName: String): Try[ServiceDiscovery] = {
dynamic
.createInstanceFor[ServiceDiscovery](clazzName, (classOf[ExtendedActorSystem] system) :: Nil)
.recoverWith {
case _: ClassNotFoundException | _: NoSuchMethodException
dynamic.createInstanceFor[ServiceDiscovery](clazzName, (classOf[ActorSystem] system) :: Nil)
}
.recoverWith {
case _: ClassNotFoundException | _: NoSuchMethodException
dynamic.createInstanceFor[ServiceDiscovery](clazzName, Nil)
}
}
val configName = "akka.discovery." + method + ".class"
val instanceTry = create(classNameFromConfig(configName))
instanceTry match {
case Failure(e @ (_: ClassNotFoundException | _: NoSuchMethodException))
throw new IllegalArgumentException(
s"Illegal [$configName] value or incompatible class! " +
"The implementation class MUST extend akka.discovery.ServiceDiscovery and take an " +
"ExtendedActorSystem as constructor argument.", e)
case Failure(e) throw e
case Success(instance) instance
}
}
}
object Discovery extends ExtensionId[Discovery] with ExtensionIdProvider {
override def apply(system: ActorSystem): Discovery = super.apply(system)
override def lookup: Discovery.type = Discovery
override def get(system: ActorSystem): Discovery = super.get(system)
override def createExtension(system: ExtendedActorSystem): Discovery = new Discovery()(system)
/**
* INTERNAL API
*/
@InternalApi
private[akka] def checkClassPathForOldDiscovery(system: ExtendedActorSystem): Unit = {
try {
system.dynamicAccess.getClassFor("akka.discovery.SimpleServiceDiscovery").get
throw new RuntimeException("Old version of Akka Discovery from Akka Management found on the classpath. Remove `com.lightbend.akka.discovery:akka-discovery` from the classpath..")
} catch {
case _: ClassNotFoundException // all good
}
}
}

View file

@ -0,0 +1,173 @@
/*
* Copyright (C) 2017-2018 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.discovery
import java.net.InetAddress
import java.util.Optional
import java.util.concurrent.CompletionStage
import java.util.concurrent.TimeUnit
import akka.actor.DeadLetterSuppression
import akka.annotation.ApiMayChange
import scala.collection.immutable
import scala.concurrent.Future
import scala.concurrent.duration.FiniteDuration
import scala.util.Try
@ApiMayChange
object ServiceDiscovery {
/** Result of a successful resolve request */
final case class Resolved(serviceName: String, addresses: immutable.Seq[ResolvedTarget])
extends DeadLetterSuppression {
/**
* Java API
*/
def getAddresses: java.util.List[ResolvedTarget] = {
import scala.collection.JavaConverters._
addresses.asJava
}
}
object ResolvedTarget {
// Simply compare the bytes of the address.
// This may not work in exotic cases such as IPv4 addresses encoded as IPv6 addresses.
private implicit val inetAddressOrdering: Ordering[InetAddress] =
Ordering.by[InetAddress, Iterable[Byte]](_.getAddress)
implicit val addressOrdering: Ordering[ResolvedTarget] = Ordering.by { t
(t.address, t.host, t.port)
}
def apply(host: String, port: Option[Int]): ResolvedTarget =
ResolvedTarget(host, port, Try(InetAddress.getByName(host)).toOption)
}
/**
* Resolved target host, with optional port and the IP address.
* @param host the hostname or the IP address of the target
* @param port optional port number
* @param address optional IP address of the target. This is used during cluster bootstap when available.
*/
final case class ResolvedTarget(
host: String,
port: Option[Int],
address: Option[InetAddress]
) {
/**
* Java API
*/
def getPort: Optional[Int] = {
import scala.compat.java8.OptionConverters._
port.asJava
}
/**
* Java API
*/
def getAddress: Optional[InetAddress] = {
import scala.compat.java8.OptionConverters._
address.asJava
}
}
}
/**
* A service lookup. It is up to each method to decide
* what to do with the optional portName and protocol fields.
* For example `portName` could be used to distinguish between
* Akka remoting ports and HTTP ports.
*
*/
@ApiMayChange
final case class Lookup(serviceName: String, portName: Option[String], protocol: Option[String]) {
/**
* Which port for a service e.g. Akka remoting or HTTP.
* Maps to "service" for an SRV records.
*/
def withPortName(value: String): Lookup = copy(portName = Some(value))
/**
* Which protocol e.g. TCP or UDP.
* Maps to "protocol" for SRV records.
*/
def withProtocol(value: String): Lookup = copy(protocol = Some(value))
}
@ApiMayChange
case object Lookup {
/**
* Create a service Lookup with only a serviceName.
* Use withPortName and withProtocol to provide optional portName
* and protocol
*/
def apply(serviceName: String): Lookup = new Lookup(serviceName, None, None)
/**
* Java API
*
* Create a service Lookup with only a serviceName.
* Use withPortName and withProtocol to provide optional portName
* and protocol
*/
def create(serviceName: String): Lookup = new Lookup(serviceName, None, None)
}
/**
* Implement to provide a service discovery method
*
*/
@ApiMayChange
abstract class ServiceDiscovery {
import ServiceDiscovery._
/**
* Scala API: Perform lookup using underlying discovery implementation.
*
* @param lookup A service discovery lookup.
* @param resolveTimeout Timeout. Up to the discovery-method to adhere to his
*/
def lookup(lookup: Lookup, resolveTimeout: FiniteDuration): Future[Resolved]
/**
* Scala API: Perform lookup using underlying discovery implementation.
*
* Convenience for when only a name is required.
*/
def lookup(serviceName: String, resolveTimeout: FiniteDuration): Future[Resolved] =
lookup(Lookup(serviceName), resolveTimeout)
/**
* Java API: Perform basic lookup using underlying discovery implementation.
*
* While the implementation may provide other settings and ways to configure timeouts,
* the passed `resolveTimeout` should never be exceeded, as it signals the application's
* eagerness to wait for a result for this specific lookup.
*
* The returned future SHOULD be failed once resolveTimeout has passed.
*
*/
def lookup(query: Lookup, resolveTimeout: java.time.Duration): CompletionStage[Resolved] = {
import scala.compat.java8.FutureConverters._
lookup(query, FiniteDuration(resolveTimeout.toMillis, TimeUnit.MILLISECONDS)).toJava
}
/**
* Java API
*
* @param serviceName A name, see discovery-method's docs for how this is interpreted
* @param resolveTimeout Timeout. Up to the discovery-methodto adhere to his
*/
def lookup(serviceName: String, resolveTimeout: java.time.Duration): CompletionStage[Resolved] =
lookup(Lookup(serviceName), resolveTimeout)
}

View file

@ -0,0 +1,93 @@
/*
* Copyright (C) 2017-2018 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.discovery.aggregate
import akka.actor.ExtendedActorSystem
import akka.annotation.InternalApi
import akka.discovery.ServiceDiscovery.Resolved
import akka.discovery.aggregate.AggregateServiceDiscovery.Methods
import akka.discovery.{ Discovery, Lookup, ServiceDiscovery }
import akka.event.Logging
import akka.util.Helpers.Requiring
import com.typesafe.config.Config
import scala.collection.JavaConverters._
import scala.concurrent.Future
import scala.concurrent.duration.FiniteDuration
import scala.util.control.NonFatal
/**
* INTERNAL API
*/
@InternalApi
private final class AggregateServiceDiscoverySettings(config: Config) {
val discoveryMethods = config
.getStringList("discovery-methods")
.asScala
.toList
.requiring(_.nonEmpty, "At least one discovery method should be specified")
}
/**
* INTERNAL API
*/
@InternalApi
private object AggregateServiceDiscovery {
type Methods = List[(String, ServiceDiscovery)]
}
/**
* INTERNAL API
*/
@InternalApi
private[akka] final class AggregateServiceDiscovery(system: ExtendedActorSystem) extends ServiceDiscovery {
private val log = Logging(system, getClass)
private val settings =
new AggregateServiceDiscoverySettings(system.settings.config.getConfig("akka.discovery.aggregate"))
private val methods = {
val serviceDiscovery = Discovery(system)
settings.discoveryMethods.map(mech (mech, serviceDiscovery.loadServiceDiscovery(mech)))
}
private implicit val ec = system.dispatcher
/**
* Each discovery method is given the resolveTimeout rather than reducing it each time between methods.
*/
override def lookup(lookup: Lookup, resolveTimeout: FiniteDuration): Future[Resolved] =
resolve(methods, lookup, resolveTimeout)
private def resolve(sds: Methods, query: Lookup, resolveTimeout: FiniteDuration): Future[Resolved] = {
sds match {
case (method, next) :: Nil
log.debug("Looking up [{}] with [{}]", query, method)
next.lookup(query, resolveTimeout)
case (method, next) :: tail
log.debug("Looking up [{}] with [{}]", query, method)
// If nothing comes back then try the next one
next
.lookup(query, resolveTimeout)
.flatMap { resolved
if (resolved.addresses.isEmpty) {
log.debug("Method[{}] returned no ResolvedTargets, trying next", query)
resolve(tail, query, resolveTimeout)
} else
Future.successful(resolved)
}
.recoverWith {
case NonFatal(t)
log.error(t, "[{}] Service discovery failed. Trying next discovery method", method)
resolve(tail, query, resolveTimeout)
}
case Nil
// this is checked in `discoveryMethods`, but silence compiler warning
throw new IllegalStateException("At least one discovery method should be specified")
}
}
}

View file

@ -0,0 +1,65 @@
/*
* Copyright (C) 2017-2018 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.discovery.config
import akka.actor.ExtendedActorSystem
import akka.annotation.InternalApi
import akka.discovery.{ Lookup, ServiceDiscovery }
import akka.discovery.ServiceDiscovery.{ Resolved, ResolvedTarget }
import akka.event.Logging
import com.typesafe.config.Config
import scala.collection.JavaConverters._
import scala.collection.{ breakOut, immutable }
import scala.concurrent.Future
import scala.concurrent.duration.FiniteDuration
/**
* INTERNAL API
*/
@InternalApi
private object ConfigServicesParser {
def parse(config: Config): Map[String, Resolved] = {
val byService = config
.root()
.entrySet()
.asScala
.map { en
(en.getKey, config.getConfig(en.getKey))
}
.toMap
byService.map {
case (serviceName, full)
val endpoints = full.getConfigList("endpoints").asScala.toList
val resolvedTargets = endpoints.map { c
val host = c.getString("host")
val port = if (c.hasPath("port")) Some(c.getInt("port")) else None
ResolvedTarget(host = host, port = port, address = None)
}
(serviceName, Resolved(serviceName, resolvedTargets))
}
}
}
/**
* INTERNAL API
*/
@InternalApi
private[akka] class ConfigServiceDiscovery(system: ExtendedActorSystem) extends ServiceDiscovery {
private val log = Logging(system, getClass)
private val resolvedServices = ConfigServicesParser.parse(
system.settings.config.getConfig(system.settings.config.getString("akka.discovery.config.services-path"))
)
log.debug("Config discovery serving: {}", resolvedServices)
override def lookup(lookup: Lookup, resolveTimeout: FiniteDuration): Future[Resolved] = {
Future.successful(resolvedServices.getOrElse(lookup.serviceName, Resolved(lookup.serviceName, Nil)))
}
}

View file

@ -0,0 +1,111 @@
/*
* Copyright (C) 2017-2018 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.discovery.dns
import java.net.InetAddress
import akka.actor.ExtendedActorSystem
import akka.annotation.InternalApi
import akka.discovery.ServiceDiscovery.{ Resolved, ResolvedTarget }
import akka.event.Logging
import akka.io.{ Dns, IO }
import akka.pattern.ask
import scala.concurrent.Future
import scala.concurrent.duration.FiniteDuration
import akka.discovery._
import akka.io.dns.DnsProtocol.{ Ip, Srv }
import akka.io.dns.{ AAAARecord, ARecord, DnsProtocol, SRVRecord }
import scala.collection.{ immutable im }
/**
* INTERNAL API
*/
@InternalApi
private object DnsServiceDiscovery {
def srvRecordsToResolved(srvRequest: String, resolved: DnsProtocol.Resolved): Resolved = {
val ips: Map[String, im.Seq[InetAddress]] =
resolved.additionalRecords.foldLeft(Map.empty[String, im.Seq[InetAddress]]) {
case (acc, a: ARecord)
acc.updated(a.name, a.ip +: acc.getOrElse(a.name, Nil))
case (acc, a: AAAARecord)
acc.updated(a.name, a.ip +: acc.getOrElse(a.name, Nil))
case (acc, _)
acc
}
val addresses = resolved.records.flatMap {
case srv: SRVRecord
val addresses = ips.getOrElse(srv.target, Nil).map(ip ResolvedTarget(srv.target, Some(srv.port), Some(ip)))
if (addresses.isEmpty) {
im.Seq(ResolvedTarget(srv.target, Some(srv.port), None))
} else {
addresses
}
case other im.Seq.empty[ResolvedTarget]
}
Resolved(srvRequest, addresses)
}
}
/**
* INTERNAL API
*/
@InternalApi
private[akka] class DnsServiceDiscovery(system: ExtendedActorSystem) extends ServiceDiscovery {
import DnsServiceDiscovery._
import ServiceDiscovery._
private val log = Logging(system, getClass)
private val dns = if (system.settings.config.getString("akka.io.dns.resolver") == "async-dns") {
log.debug("using system resolver as it is set to async-dns")
IO(Dns)(system)
} else {
log.debug("system resolver is not async-dns. Loading isolated resolver")
Dns(system).loadAsyncDns("SD-DNS")
}
import system.dispatcher
private def cleanIpString(ipString: String): String =
if (ipString.startsWith("/")) ipString.tail else ipString
override def lookup(lookup: Lookup, resolveTimeout: FiniteDuration): Future[Resolved] = {
lookup match {
case Lookup(name, Some(portName), Some(protocol))
val srvRequest = s"_$portName._$protocol.$name"
log.debug("Lookup [{}] translated to SRV query [{}] as contains portName and protocol", lookup, srvRequest)
dns.ask(DnsProtocol.Resolve(srvRequest, Srv))(resolveTimeout).map {
case resolved: DnsProtocol.Resolved
log.debug("Resolved Dns.Resolved: {}", resolved)
srvRecordsToResolved(srvRequest, resolved)
case resolved
log.warning("Resolved UNEXPECTED (resolving to Nil): {}", resolved.getClass)
Resolved(srvRequest, Nil)
}
case _
log.debug("Lookup[{}] translated to A/AAAA lookup as does not have portName and protocol", lookup)
dns.ask(DnsProtocol.Resolve(lookup.serviceName, Ip()))(resolveTimeout).map {
case resolved: DnsProtocol.Resolved
log.debug("Resolved Dns.Resolved: {}", resolved)
val addresses = resolved.records.collect {
case a: ARecord ResolvedTarget(cleanIpString(a.ip.getHostAddress), None, Some(a.ip))
case a: AAAARecord ResolvedTarget(cleanIpString(a.ip.getHostAddress), None, Some(a.ip))
}
Resolved(lookup.serviceName, addresses)
case resolved
log.warning("Resolved UNEXPECTED (resolving to Nil): {}", resolved.getClass)
Resolved(lookup.serviceName, Nil)
}
}
}
}

View file

@ -0,0 +1,36 @@
/*
* Copyright (C) 2017-2018 Lightbend Inc. <https://www.lightbend.com>
*/
package jdoc.akka.discovery;
import akka.actor.ActorSystem;
import akka.discovery.Lookup;
import akka.discovery.Discovery;
import akka.discovery.ServiceDiscovery;
import java.time.Duration;
import java.util.concurrent.CompletionStage;
public class CompileOnlyTest {
public static void example() {
//#loading
ActorSystem as = ActorSystem.create();
ServiceDiscovery serviceDiscovery = Discovery.get(as).discovery();
//#loading
//#basic
serviceDiscovery.lookup(Lookup.create("akka.io"), Duration.ofSeconds(1));
// convenience for a Lookup with only a serviceName
serviceDiscovery.lookup("akka.io", Duration.ofSeconds(1));
//#basic
//#full
CompletionStage<ServiceDiscovery.Resolved> lookup = serviceDiscovery.lookup(Lookup.create("akka.io").withPortName("remoting").withProtocol("tcp"), Duration.ofSeconds(1));
//#full
// not-used warning
lookup.thenAccept(System.out::println);
}
}

View file

@ -0,0 +1,144 @@
/*
* Copyright (C) 2017-2018 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.discovery
import scala.concurrent.Future
import scala.concurrent.duration.FiniteDuration
import akka.actor.ActorSystem
import akka.discovery.ServiceDiscovery.Resolved
import akka.testkit.TestKit
import com.typesafe.config.ConfigFactory
import org.scalatest.Matchers
import org.scalatest.WordSpec
class DiscoveryConfigurationSpec extends WordSpec with Matchers {
"ServiceDiscovery" should {
"throw when no default discovery configured" in {
val sys = ActorSystem("DiscoveryConfigurationSpec")
try {
val ex = intercept[Exception] {
Discovery(sys).discovery
}
ex.getMessage should include("No default service discovery implementation configured")
} finally TestKit.shutdownActorSystem(sys)
}
"select implementation from config by config name (inside akka.discovery namespace)" in {
val className = classOf[FakeTestDiscovery].getCanonicalName
val sys = ActorSystem("DiscoveryConfigurationSpec", ConfigFactory.parseString(s"""
akka.discovery {
method = akka-mock-inside
akka-mock-inside {
class = $className
}
}
""").withFallback(ConfigFactory.load()))
try Discovery(sys).discovery.getClass.getCanonicalName should ===(className)
finally TestKit.shutdownActorSystem(sys)
}
"load another implementation from config by config name" in {
val className1 = classOf[FakeTestDiscovery].getCanonicalName
val className2 = classOf[FakeTestDiscovery2].getCanonicalName
val sys = ActorSystem("DiscoveryConfigurationSpec", ConfigFactory.parseString(s"""
akka.discovery {
method = mock1
mock1 {
class = $className1
}
mock2 {
class = $className2
}
}
""").withFallback(ConfigFactory.load()))
try {
Discovery(sys).discovery.getClass.getCanonicalName should ===(className1)
Discovery(sys).loadServiceDiscovery("mock2").getClass.getCanonicalName should ===(className2)
} finally TestKit.shutdownActorSystem(sys)
}
"return same instance for same method" in {
val className1 = classOf[FakeTestDiscovery].getCanonicalName
val className2 = classOf[FakeTestDiscovery2].getCanonicalName
val sys = ActorSystem("DiscoveryConfigurationSpec", ConfigFactory.parseString(s"""
akka.discovery {
method = mock1
mock1 {
class = $className1
}
mock2 {
class = $className2
}
}
""").withFallback(ConfigFactory.load()))
try {
Discovery(sys).loadServiceDiscovery("mock2") should be theSameInstanceAs Discovery(sys)
.loadServiceDiscovery("mock2")
Discovery(sys).discovery should be theSameInstanceAs Discovery(sys).loadServiceDiscovery("mock1")
} finally TestKit.shutdownActorSystem(sys)
}
"throw a specific discovery method exception" in {
val className = classOf[ExceptionThrowingDiscovery].getCanonicalName
val sys = ActorSystem("DiscoveryConfigurationSpec", ConfigFactory.parseString(s"""
akka.discovery {
method = "mock1"
mock1 {
class = $className
}
}
""").withFallback(ConfigFactory.load()))
try {
an[DiscoveryException] should be thrownBy Discovery(sys).discovery
} finally TestKit.shutdownActorSystem(sys)
}
"throw an illegal argument exception for not existing method" in {
val className = "className"
val sys = ActorSystem("DiscoveryConfigurationSpec", ConfigFactory.parseString(s"""
akka.discovery {
method = "$className"
}
""").withFallback(ConfigFactory.load()))
try {
an[IllegalArgumentException] should be thrownBy Discovery(sys).discovery
} finally TestKit.shutdownActorSystem(sys)
}
}
}
class FakeTestDiscovery extends ServiceDiscovery {
override def lookup(lookup: Lookup, resolveTimeout: FiniteDuration): Future[Resolved] = ???
}
class FakeTestDiscovery2 extends FakeTestDiscovery
class DiscoveryException(message: String) extends Exception
class ExceptionThrowingDiscovery extends ServiceDiscovery {
def lookup(lookup: Lookup, resolveTimeout: FiniteDuration): Future[Resolved] = ???
throw new DiscoveryException("Test Exception")
}

View file

@ -0,0 +1,122 @@
/*
* Copyright (C) 2017-2018 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.discovery.aggregate
import akka.actor.{ ActorSystem, ExtendedActorSystem }
import akka.discovery.ServiceDiscovery.{ Resolved, ResolvedTarget }
import akka.discovery.{ Lookup, Discovery, ServiceDiscovery }
import akka.testkit.TestKit
import com.typesafe.config.{ Config, ConfigFactory }
import org.scalatest.concurrent.ScalaFutures
import org.scalatest.{ BeforeAndAfterAll, Matchers, WordSpecLike }
import scala.concurrent.Future
import scala.concurrent.duration._
import scala.collection.immutable
class StubbedServiceDiscovery(system: ExtendedActorSystem) extends ServiceDiscovery {
override def lookup(query: Lookup, resolveTimeout: FiniteDuration): Future[Resolved] = {
if (query.serviceName == "stubbed") {
Future.successful(Resolved(
query.serviceName,
immutable.Seq(
ResolvedTarget(host = "stubbed1", port = Some(1234), address = None)
)))
} else if (query.serviceName == "fail") {
Future.failed(new RuntimeException("No resolving for you!"))
} else {
Future.successful(Resolved(query.serviceName, immutable.Seq.empty))
}
}
}
object AggregateServiceDiscoverySpec {
val config: Config = ConfigFactory.parseString("""
akka {
loglevel = DEBUG
discovery {
method = aggregate
aggregate {
discovery-methods = ["stubbed1", "config"]
}
}
}
akka.discovery.stubbed1 {
class = akka.discovery.aggregate.StubbedServiceDiscovery
}
akka.discovery.config.services = {
config1 = {
endpoints = [
{
host = "cat"
port = 1233
},
{
host = "dog"
port = 1234
}
]
},
fail = {
endpoints = [
{
host = "from-config"
}
]
}
}
""")
}
class AggregateServiceDiscoverySpec
extends TestKit(ActorSystem("AggregateDiscoverySpec", AggregateServiceDiscoverySpec.config))
with WordSpecLike
with Matchers
with BeforeAndAfterAll
with ScalaFutures {
override protected def afterAll(): Unit = {
TestKit.shutdownActorSystem(system)
}
val discovery: ServiceDiscovery = Discovery(system).discovery
"Aggregate service discovery" must {
"only call first one if returns results" in {
val results = discovery.lookup("stubbed", 100.millis).futureValue
results shouldEqual Resolved(
"stubbed",
immutable.Seq(
ResolvedTarget(host = "stubbed1", port = Some(1234), address = None)
))
}
"move onto the next if no resolved targets" in {
val results = discovery.lookup("config1", 100.millis).futureValue
results shouldEqual Resolved(
"config1",
immutable.Seq(
ResolvedTarget(host = "cat", port = Some(1233), address = None),
ResolvedTarget(host = "dog", port = Some(1234), address = None)
))
}
"move onto next if fails" in {
val results = discovery.lookup("fail", 100.millis).futureValue
// Stub fails then result comes from config
results shouldEqual Resolved(
"fail",
immutable.Seq(
ResolvedTarget(host = "from-config", port = None, address = None)
))
}
}
}

View file

@ -0,0 +1,78 @@
/*
* Copyright (C) 2017-2018 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.discovery.config
import akka.actor.ActorSystem
import akka.discovery.Discovery
import akka.discovery.ServiceDiscovery.ResolvedTarget
import akka.testkit.TestKit
import com.typesafe.config.{ Config, ConfigFactory }
import org.scalatest.concurrent.ScalaFutures
import org.scalatest.{ BeforeAndAfterAll, Matchers, WordSpecLike }
import scala.concurrent.duration._
import scala.collection.immutable
object ConfigServiceDiscoverySpec {
val config: Config = ConfigFactory.parseString("""
akka {
loglevel = DEBUG
discovery {
method = config
config {
services = {
service1 = {
endpoints = [
{
host = "cat"
port = 1233
},
{
host = "dog"
}
]
},
service2 = {
endpoints = []
}
}
}
}
}
""")
}
class ConfigServiceDiscoverySpec
extends TestKit(ActorSystem("ConfigDiscoverySpec", ConfigServiceDiscoverySpec.config))
with WordSpecLike
with Matchers
with BeforeAndAfterAll
with ScalaFutures {
override protected def afterAll(): Unit = {
TestKit.shutdownActorSystem(system)
}
val discovery = Discovery(system).discovery
"Config discovery" must {
"load from config" in {
val result = discovery.lookup("service1", 100.millis).futureValue
result.serviceName shouldEqual "service1"
result.addresses shouldEqual immutable.Seq(
ResolvedTarget(host = "cat", port = Some(1233), address = None),
ResolvedTarget(host = "dog", port = None, address = None)
)
}
"return no resolved targets if not in config" in {
val result = discovery.lookup("dontexist", 100.millis).futureValue
result.serviceName shouldEqual "dontexist"
result.addresses shouldEqual immutable.Seq.empty
}
}
}

View file

@ -0,0 +1,52 @@
/*
* Copyright (C) 2017-2018 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.discovery.config
import akka.discovery.ServiceDiscovery.{ Resolved, ResolvedTarget }
import akka.discovery.config.ConfigServicesParserSpec._
import com.typesafe.config.{ Config, ConfigFactory }
import org.scalatest.{ Matchers, WordSpec }
import scala.collection.immutable
object ConfigServicesParserSpec {
val exampleConfig: Config = ConfigFactory.parseString("""
services {
service1 {
endpoints = [
{
host = "cat"
port = 1233
},
{
host = "dog"
}
]
},
service2 {
endpoints = []
}
}
""".stripMargin)
}
class ConfigServicesParserSpec extends WordSpec with Matchers {
"Config parsing" must {
"parse" in {
val config = exampleConfig.getConfig("services")
val result = ConfigServicesParser.parse(config)
result("service1") shouldEqual Resolved(
"service1",
immutable.Seq(
ResolvedTarget(host = "cat", port = Some(1233), address = None),
ResolvedTarget(host = "dog", port = None, address = None)
))
result("service2") shouldEqual Resolved("service2", immutable.Seq())
}
}
}

View file

@ -0,0 +1,119 @@
/*
* Copyright (C) 2018 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.discovery.dns
import java.net.InetAddress
import akka.actor.ActorSystem
import akka.discovery.{ Discovery, Lookup }
import akka.discovery.ServiceDiscovery.ResolvedTarget
import akka.io.dns.DockerBindDnsService
import akka.testkit.{ AkkaSpec, SocketUtil, TestKit }
import com.typesafe.config.ConfigFactory
import scala.concurrent.duration._
object DnsDiscoverySpec {
val config = ConfigFactory.parseString(
s"""
//#configure-dns
akka {
discovery {
method = akka-dns
}
}
//#configure-dns
akka {
loglevel = DEBUG
}
akka.io.dns.async-dns.nameservers = ["localhost:${DnsDiscoverySpec.dockerDnsServerPort}"]
""")
lazy val dockerDnsServerPort = SocketUtil.temporaryLocalPort()
val configWithAsyncDnsResolverAsDefault = ConfigFactory.parseString(
"""
akka.io.dns.resolver = "async-dns"
""").withFallback(config)
}
class DnsDiscoverySpec extends AkkaSpec(DnsDiscoverySpec.config)
with DockerBindDnsService {
import DnsDiscoverySpec._
override val hostPort: Int = DnsDiscoverySpec.dockerDnsServerPort
val systemWithAsyncDnsAsResolver = ActorSystem("AsyncDnsSystem", configWithAsyncDnsResolverAsDefault)
"Dns Discovery with isolated resolver" must {
if (!dockerAvailable()) {
system.log.error("Test not run as docker is not available")
pending
}
"work with SRV records" in {
val discovery = Discovery(system).discovery
val name = "_service._tcp.foo.test."
val result =
discovery
.lookup(Lookup("foo.test.").withPortName("service").withProtocol("tcp"), resolveTimeout = 10.seconds)
.futureValue
result.addresses.toSet shouldEqual Set(
ResolvedTarget("a-single.foo.test", Some(5060), Some(InetAddress.getByName("192.168.1.20"))),
ResolvedTarget("a-double.foo.test", Some(65535), Some(InetAddress.getByName("192.168.1.21"))),
ResolvedTarget("a-double.foo.test", Some(65535), Some(InetAddress.getByName("192.168.1.22")))
)
result.serviceName shouldEqual name
}
"work with IP records" in {
val discovery = Discovery(system).discovery
val name = "a-single.foo.test"
val result = discovery.lookup(name, resolveTimeout = 500.milliseconds).futureValue
result.serviceName shouldEqual name
result.addresses.toSet shouldEqual Set(
ResolvedTarget("192.168.1.20", None)
)
}
"be using its own resolver" in {
// future will fail if it it doesn't exist
system.actorSelection("/system/SD-DNS/async-dns").resolveOne(2.seconds).futureValue
}
}
"Dns discovery with the system resolver" must {
"work with SRV records" in {
val discovery = Discovery(systemWithAsyncDnsAsResolver).discovery
val name = "_service._tcp.foo.test."
val result =
discovery
.lookup(Lookup("foo.test.").withPortName("service").withProtocol("tcp"), resolveTimeout = 10.seconds)
.futureValue
result.addresses.toSet shouldEqual Set(
ResolvedTarget("a-single.foo.test", Some(5060), Some(InetAddress.getByName("192.168.1.20"))),
ResolvedTarget("a-double.foo.test", Some(65535), Some(InetAddress.getByName("192.168.1.21"))),
ResolvedTarget("a-double.foo.test", Some(65535), Some(InetAddress.getByName("192.168.1.22")))
)
result.serviceName shouldEqual name
}
"be using the system resolver" in {
// check the service discovery one doesn't exist
systemWithAsyncDnsAsResolver.actorSelection("/system/SD-DNS/async-dns").resolveOne(2.seconds).failed.futureValue
}
}
override def afterTermination(): Unit = {
super.afterTermination()
TestKit.shutdownActorSystem(system)
}
}

View file

@ -0,0 +1,67 @@
/*
* Copyright (C) 2017-2018 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.discovery.dns
import java.net.{ Inet6Address, InetAddress }
import akka.discovery.ServiceDiscovery
import akka.discovery.ServiceDiscovery.{ Resolved, ResolvedTarget }
import akka.io.dns.CachePolicy.Ttl
import akka.io.dns.{ AAAARecord, ARecord, DnsProtocol, SRVRecord }
import org.scalatest.{ Matchers, WordSpec }
import scala.collection.{ immutable im }
import scala.concurrent.duration._
class DnsServiceDiscoverySpec extends WordSpec with Matchers {
"srvRecordsToResolved" must {
"fill in ips from A records" in {
val resolved = DnsProtocol.Resolved("cats.com", im.Seq(new SRVRecord("cats.com", Ttl.fromPositive(1.second), 2, 3, 4, "kittens.com")),
im.Seq(
new ARecord("kittens.com", Ttl.fromPositive(1.second), InetAddress.getByName("127.0.0.2")),
new ARecord("kittens.com", Ttl.fromPositive(1.second), InetAddress.getByName("127.0.0.3")),
new ARecord("donkeys.com", Ttl.fromPositive(1.second), InetAddress.getByName("127.0.0.4"))
))
val result: ServiceDiscovery.Resolved =
DnsServiceDiscovery.srvRecordsToResolved("cats.com", resolved)
result.serviceName shouldEqual "cats.com"
result.addresses.toSet shouldEqual Set(
ResolvedTarget("kittens.com", Some(4), Some(InetAddress.getByName("127.0.0.2"))),
ResolvedTarget("kittens.com", Some(4), Some(InetAddress.getByName("127.0.0.3")))
)
}
// Naughty DNS server
"use SRV target and port if no additional records" in {
val resolved = DnsProtocol.Resolved("cats.com", im.Seq(new SRVRecord("cats.com", Ttl.fromPositive(1.second), 2, 3, 8080, "kittens.com")),
im.Seq(new ARecord("donkeys.com", Ttl.fromPositive(1.second), InetAddress.getByName("127.0.0.4"))))
val result =
DnsServiceDiscovery.srvRecordsToResolved("cats.com", resolved)
result shouldEqual Resolved("cats.com", im.Seq(ResolvedTarget("kittens.com", Some(8080), None)))
}
"fill in ips from AAAA records" in {
val resolved = DnsProtocol.Resolved("cats.com", im.Seq(new SRVRecord("cats1.com", Ttl.fromPositive(1.second), 2, 3, 4, "kittens.com")),
im.Seq(
new AAAARecord("kittens.com", Ttl.fromPositive(2.seconds), InetAddress.getByName("::1").asInstanceOf[Inet6Address]),
new AAAARecord("kittens.com", Ttl.fromPositive(2.seconds), InetAddress.getByName("::2").asInstanceOf[Inet6Address]),
new AAAARecord("donkeys.com", Ttl.fromPositive(2.seconds), InetAddress.getByName("::3").asInstanceOf[Inet6Address])
))
val result: ServiceDiscovery.Resolved =
DnsServiceDiscovery.srvRecordsToResolved("cats.com", resolved)
result.serviceName shouldEqual "cats.com"
result.addresses.toSet shouldEqual Set(
ResolvedTarget("kittens.com", Some(4), Some(InetAddress.getByName("::1"))),
ResolvedTarget("kittens.com", Some(4), Some(InetAddress.getByName("::2")))
)
}
}
}

View file

@ -0,0 +1,34 @@
/*
* Copyright (C) 2017-2018 Lightbend Inc. <https://www.lightbend.com>
*/
package doc.akka.discovery
import akka.actor.ActorSystem
import akka.discovery.{ Discovery, Lookup, ServiceDiscovery }
import scala.concurrent.Future
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits.global
object CompileOnlySpec {
//#loading
val system = ActorSystem()
val serviceDiscovery = Discovery(system).discovery
//#loading
//#basic
serviceDiscovery.lookup(Lookup("akka.io"), 1.second)
// Convenience for a Lookup with only a serviceName
serviceDiscovery.lookup("akka.io", 1.second)
//#basic
//#full
val lookup: Future[ServiceDiscovery.Resolved] = serviceDiscovery.lookup(Lookup("akka.io").withPortName("remoting").withProtocol("tcp"), 1.second)
//#full
// compiler
lookup.foreach(println)
}

View file

@ -0,0 +1,245 @@
# Discovery
@@@ warning
This module is currently marked as @ref:[may change](../common/may-change.md)
This means that API or semantics can change without warning or deprecation period.
@@@
Akka Discovery provides an interface around various ways of locating services. The built in methods are:
* Configuration
* DNS
* Aggregate
In addition [Akka Management](https://developer.lightbend.com/docs/akka-management/current/) contains methods such as:
* Kubernetes API
* AWS
* Consul
* Marathon API
Discovery used to be part of Akka Management but has become an Akka module as of `2.5.19` of Akka and version `0.21.0`
of Akka Management. If you're also using Akka Management for other service discovery methods or bootstrap make
sure you are using at least version `0.21.0`.
## Dependency
@@dependency[sbt,Gradle,Maven] {
group="com.typesafe.akka"
artifact="akka-discovery_$scala.binary_version$"
version="$akka.version$"
}
## How it works
Loading the extension:
Scala
: @@snip [CompileOnlySpec.scala](/akka-discovery/src/test/scala/doc/akka/discovery/CompileOnlySpec.scala) { #loading }
Java
: @@snip [CompileOnlyTest.java](/akka-discovery/src/test/java/jdoc/akka/discovery/CompileOnlyTest.java) { #loading }
A `Lookup` contains a mandatory `serviceName` and an optional `portName` and `protocol`. How these are interpreted is discovery
method dependent e.g.DNS does an A/AAAA record query if any of the fields are missing and an SRV query for a full look up:
Scala
: @@snip [CompileOnlySpec.scala](/akka-discovery/src/test/scala/doc/akka/discovery/CompileOnlySpec.scala) { #basic }
Java
: @@snip [CompileOnlyTest.java](/akka-discovery/src/test/java/jdoc/akka/discovery/CompileOnlyTest.java) { #basic }
`portName` and `protocol` are optional and their meaning is interpreted by the method.
Scala
: @@snip [CompileOnlySpec.scala](/akka-discovery/src/test/scala/doc/akka/discovery/CompileOnlySpec.scala) { #full }
Java
: @@snip [CompileOnlyTest.java](/akka-discovery/src/test/java/jdoc/akka/discovery/CompileOnlyTest.java) { #full }
Port can be used when a service opens multiple ports e.g. a HTTP port and an Akka remoting port.
## Discovery Method: DNS
DNS discovery maps `Lookup` queries as follows:
* `serviceName`, `portName` and `protocol` set: SRV query in the form: `_port._protocol._name` Where the `_`s are added.
* Any query missing any of the fields is mapped to a A/AAAA query for the `serviceName`
The mapping between Akka service discovery terminology and SRV terminology:
* SRV service = port
* SRV name = serviceName
* SRV protocol = protocol
Configure `akka-dns` to be used as the discovery implementation in your `application.conf`:
@@snip[application.conf](/akka-discovery/src/test/scala/akka/discovery/dns/DnsDiscoverySpec.scala){ #configure-dns }
From there on, you can use the generic API that hides the fact which discovery method is being used by calling::
Scala
: ```scala
import akka.discovery.ServiceDiscovery
val system = ActorSystem("Example")
// ...
val discovery = ServiceDiscovery(system).discovery
val result: Future[Resolved] = discovery.lookup("service-name", resolveTimeout = 500 milliseconds)
```
Java
: ```java
import akka.discovery.ServiceDiscovery;
ActorSystem system = ActorSystem.create("Example");
// ...
SimpleServiceDiscovery discovery = ServiceDiscovery.get(system).discovery();
Future<SimpleServiceDiscovery.Resolved> result = discovery.lookup("service-name", Duration.create("500 millis"));
```
### How it works
DNS discovery will use either A/AAAA records or SRV records depending on whether a `Simple` or `Full` lookup is issued..
The advantage of SRV records is that they can include a port.
#### SRV records
Lookups with all the fields set become SRV queries. For example:
```
dig srv _service._tcp.akka.test
; <<>> DiG 9.11.3-RedHat-9.11.3-6.fc28 <<>> srv service.tcp.akka.test
;; global options: +cmd
;; Got answer:
;; ->>HEADER<<- opcode: QUERY, status: NOERROR, id: 60023
;; flags: qr aa rd ra; QUERY: 1, ANSWER: 2, AUTHORITY: 1, ADDITIONAL: 5
;; OPT PSEUDOSECTION:
; EDNS: version: 0, flags:; udp: 4096
; COOKIE: 5ab8dd4622e632f6190f54de5b28bb8fb1b930a5333c3862 (good)
;; QUESTION SECTION:
;service.tcp.akka.test. IN SRV
;; ANSWER SECTION:
_service._tcp.akka.test. 86400 IN SRV 10 60 5060 a-single.akka.test.
_service._tcp.akka.test. 86400 IN SRV 10 40 5070 a-double.akka.test.
```
In this case `service.tcp.akka.test` resolves to `a-single.akka.test` on port `5060`
and `a-double.akka.test` on port `5070`. Currently discovery does not support the weightings.
#### A/AAAA records
Lookups with any fields missing become A/AAAA record queries. For example:
```
dig a-double.akka.test
; <<>> DiG 9.11.3-RedHat-9.11.3-6.fc28 <<>> a-double.akka.test
;; global options: +cmd
;; Got answer:
;; ->>HEADER<<- opcode: QUERY, status: NOERROR, id: 11983
;; flags: qr aa rd ra; QUERY: 1, ANSWER: 2, AUTHORITY: 1, ADDITIONAL: 2
;; OPT PSEUDOSECTION:
; EDNS: version: 0, flags:; udp: 4096
; COOKIE: 16e9815d9ca2514d2f3879265b28bad05ff7b4a82721edd0 (good)
;; QUESTION SECTION:
;a-double.akka.test. IN A
;; ANSWER SECTION:
a-double.akka.test. 86400 IN A 192.168.1.21
a-double.akka.test. 86400 IN A 192.168.1.22
```
In this case `a-double.akka.test` would resolve to `192.168.1.21` and `192.168.1.22`.
## Discovery Method: Configuration
Configuration currently ignores all fields apart from service name.
For simple use cases configuration can be used for service discovery. The advantage of using Akka Discovery with
configuration rather than your own configuration values is that applications can be migrated to a more
sophisticated discovery method without any code changes.
Configure it to be used as discovery method in your `application.conf`
```
akka {
discovery.method = config
}
```
By default the services discoverable are defined in `akka.discovery.config.services` and have the following format:
```
akka.discovery.config.services = {
service1 = {
endpoints = [
{
host = "cat"
port = 1233
},
{
host = "dog"
port = 1234
}
]
},
service2 = {
endpoints = []
}
}
```
Where the above block defines two services, `service1` and `service2`.
Each service can have multiple endpoints.
## Discovery Method: Aggregate multiple discovery methods
Aggregate discovery allows multiple discovery methods to be aggregated e.g. try and resolve
via DNS and fall back to configuration.
To use aggregate discovery add its dependency as well as all of the discovery that you
want to aggregate.
Configure `aggregate` as `akka.discovery.method` and which discovery methods are tried and in which order.
```
akka {
discovery {
method = aggregate
aggregate {
discovery-methods = ["akka-dns", "config"]
}
config {
services {
service1 {
endpoints [
{
host = "host1"
port = 1233
},
{
host = "host2"
port = 1234
}
]
}
}
}
}
}
```
The above configuration will result in `akka-dns` first being checked and if it fails or returns no
targets for the given service name then `config` is queried which i configured with one service called
`service1` which two hosts `host1` and `host2`.

View file

@ -12,6 +12,7 @@
* [index-cluster](index-cluster.md)
* [stream/index](stream/index.md)
* [index-network](index-network.md)
* [discovery](discovery/index.md)
* [index-futures](index-futures.md)
* [index-utilities](index-utilities.md)
* [common/other-modules](common/other-modules.md)

View file

@ -45,7 +45,8 @@ lazy val aggregatedProjects: Seq[ProjectReference] = Seq(
actorTyped, actorTypedTests, actorTestkitTyped,
persistenceTyped,
clusterTyped, clusterShardingTyped,
streamTyped
streamTyped,
discovery
)
lazy val root = Project(
@ -479,6 +480,16 @@ lazy val actorTypedTests = akkaModule("akka-actor-typed-tests")
.disablePlugins(MimaPlugin)
.enablePlugins(NoPublish)
lazy val discovery = akkaModule("akka-discovery")
.dependsOn(
actor,
testkit % "test->test",
actorTests % "test->test"
)
.settings(Dependencies.discovery)
.settings(AkkaBuild.mayChangeSettings)
.settings(AutomaticModuleName.settings("akka.discovery"))
.settings(OSGi.discovery)
def akkaModule(name: String): Project =
Project(id = name, base = file(name))

View file

@ -148,6 +148,8 @@ object Dependencies {
val actor = l ++= Seq(config, java8Compat.value)
val discovery = l ++= Seq(Test.junit, Test.scalatest.value)
val testkit = l ++= Seq(Test.junit, Test.scalatest.value) ++ Test.metricsAll
val actorTests = l ++= Seq(

View file

@ -24,7 +24,8 @@ object MiMa extends AutoPlugin {
val versions: Seq[String] = {
val akka24NoStreamVersions = Seq("2.4.0", "2.4.1")
val akka25Versions = (0 to latestPatchOf25).map(patch s"2.5.$patch")
val akka24StreamVersions = (2 to 12) map ("2.4." + _)
val akka24StreamVersions = (2 to 12).map("2.4." + _)
val akka25DiscoveryVersions = (19 to latestPatchOf25).map(patch => s"2.5.$patch")
val akka24WithScala212 =
(13 to latestPatchOf24)
.map("2.4." + _)
@ -35,17 +36,24 @@ object MiMa extends AutoPlugin {
"akka-stream-testkit")
val akka250NewArtifacts = Seq(
"akka-persistence-query")
val akka2519NewArtifacts = Seq(
"akka-discovery"
)
scalaBinaryVersion match {
case "2.11"
if (akka250NewArtifacts.contains(projectName)) akka25Versions
if (akka2519NewArtifacts.contains(projectName))
akka25DiscoveryVersions
else if (akka250NewArtifacts.contains(projectName)) akka25Versions
else {
if (!akka242NewArtifacts.contains(projectName)) akka24NoStreamVersions
else Seq.empty
} ++ akka24StreamVersions ++ akka24WithScala212 ++ akka25Versions
case "2.12"
if (akka250NewArtifacts.contains(projectName))
if (akka2519NewArtifacts.contains(projectName))
akka25DiscoveryVersions
else if (akka250NewArtifacts.contains(projectName))
akka25Versions
else
akka24WithScala212 ++ akka25Versions

View file

@ -122,6 +122,8 @@ object OSGi {
val testkit = exports(Seq("akka.testkit.*"))
val discovery = exports(Seq("akka.discovery.*"))
val osgiOptionalImports = Seq(
// needed because testkit is normally not used in the application bundle,
// but it should still be included as transitive dependency and used by BundleDelegatingClassLoader