diff --git a/akka-actor-tests/src/test/bind/etc/db.foo.test b/akka-actor-tests/src/test/bind/etc/db.foo.test index ce39716ace..d64f28e3e4 100755 --- a/akka-actor-tests/src/test/bind/etc/db.foo.test +++ b/akka-actor-tests/src/test/bind/etc/db.foo.test @@ -72,8 +72,8 @@ many in AAAA 2001:985:965:1:ba27:ebff:fe5f:9d2d many in AAAA 2001:985:965:1:ba27:ebff:fe5f:9d2e many in AAAA 2001:985:965:1:ba27:ebff:fe5f:9d2f -service.tcp 86400 IN SRV 10 65534 5060 a-single -service.tcp 86400 IN SRV 65533 40 65535 a-double +_service._tcp 86400 IN SRV 10 65534 5060 a-single +_service._tcp 86400 IN SRV 65533 40 65535 a-double cname-in IN CNAME a-double cname-ext IN CNAME a-single.bar.example. diff --git a/akka-actor-tests/src/test/scala/akka/io/dns/AsyncDnsResolverIntegrationSpec.scala b/akka-actor-tests/src/test/scala/akka/io/dns/AsyncDnsResolverIntegrationSpec.scala index bb5cdfae5e..03f0b43f3a 100644 --- a/akka-actor-tests/src/test/scala/akka/io/dns/AsyncDnsResolverIntegrationSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/io/dns/AsyncDnsResolverIntegrationSpec.scala @@ -125,13 +125,13 @@ class AsyncDnsResolverIntegrationSpec extends AkkaSpec( } "resolve SRV record" in { - val name = "service.tcp.foo.test" - val answer = resolve("service.tcp.foo.test", Srv) + val name = "_service._tcp.foo.test" + val answer = resolve(name, Srv) answer.name shouldEqual name answer.records.collect { case r: SRVRecord ⇒ r }.toSet shouldEqual Set( - SRVRecord("service.tcp.foo.test", Ttl.fromPositive(86400.seconds), 10, 65534, 5060, "a-single.foo.test"), - SRVRecord("service.tcp.foo.test", Ttl.fromPositive(86400.seconds), 65533, 40, 65535, "a-double.foo.test") + SRVRecord("_service._tcp.foo.test", Ttl.fromPositive(86400.seconds), 10, 65534, 5060, "a-single.foo.test"), + SRVRecord("_service._tcp.foo.test", Ttl.fromPositive(86400.seconds), 65533, 40, 65535, "a-double.foo.test") ) } diff --git a/akka-actor/src/main/mima-filters/2.5.18.backwards.excludes b/akka-actor/src/main/mima-filters/2.5.18.backwards.excludes index df53d665f4..db14a2879c 100644 --- a/akka-actor/src/main/mima-filters/2.5.18.backwards.excludes +++ b/akka-actor/src/main/mima-filters/2.5.18.backwards.excludes @@ -1,3 +1,6 @@ +# Dns discovery custom resolver #25937 - Internal API +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.io.dns.internal.AsyncDnsManager.ext") + # Replaces DNS TTL primitive types with Duration #25850 ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.io.SimpleDnsCache.put") ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.io.SimpleDnsCache#Cache.put") @@ -54,4 +57,3 @@ ProblemFilters.exclude[DirectMissingMethodProblem]("akka.io.TcpConnection.abort" ProblemFilters.exclude[DirectMissingMethodProblem]("akka.io.ChannelRegistration.cancel") ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.io.ChannelRegistration.cancelAndClose") ProblemFilters.exclude[DirectMissingMethodProblem]("akka.io.SelectionHandler#ChannelRegistryImpl.this") - diff --git a/akka-actor/src/main/scala/akka/io/Dns.scala b/akka-actor/src/main/scala/akka/io/Dns.scala index a5a1e895ab..0fb1f2faf3 100644 --- a/akka-actor/src/main/scala/akka/io/Dns.scala +++ b/akka-actor/src/main/scala/akka/io/Dns.scala @@ -5,11 +5,13 @@ package akka.io import java.net.{ Inet4Address, Inet6Address, InetAddress, UnknownHostException } +import java.util.concurrent.ConcurrentHashMap import akka.actor._ +import akka.annotation.InternalApi import akka.routing.ConsistentHashingRouter.ConsistentHashable import com.typesafe.config.Config - +import java.util.function.{ Function ⇒ JFunction } import scala.collection.{ breakOut, immutable } abstract class Dns { @@ -87,32 +89,77 @@ object Dns extends ExtensionId[DnsExt] with ExtensionIdProvider { override def get(system: ActorSystem): DnsExt = super.get(system) } -class DnsExt(val system: ExtendedActorSystem) extends IO.Extension { +class DnsExt private[akka] (val system: ExtendedActorSystem, resolverName: String, managerName: String) extends IO.Extension { - val Settings = new Settings(system.settings.config.getConfig("akka.io.dns")) + private val asyncDns = new ConcurrentHashMap[String, ActorRef] - class Settings private[DnsExt] (_config: Config) { + /** + * INTERNAL API + * + * Load an additional async-dns resolver. Can be used to use async-dns even if inet-resolver is the configuerd + * default. + * Intentionally chosen not to support loading an arbitrary resolver as it required a specific constructor + * for the manager actor. The expected constructor for DNS plugins is just to take in a DnsExt which can't + * be used in this case + */ + @InternalApi + private[akka] def loadAsyncDns(managerName: String): ActorRef = { + // This can't pass in `this` as then AsyncDns would pick up the system settings + asyncDns.computeIfAbsent(managerName, new JFunction[String, ActorRef] { + override def apply(r: String): ActorRef = { + val settings = new Settings(system.settings.config.getConfig("akka.io.dns"), "async-dns") + val provider = system.dynamicAccess.getClassFor[DnsProvider](settings.ProviderObjectName).get.newInstance() + system.log.info("Creating async dns resolver {} with manager name {}", settings.Resolver, managerName) + system.systemActorOf( + props = Props(provider.managerClass, settings.Resolver, system, settings.ResolverConfig, provider.cache, settings.Dispatcher, provider).withDeploy(Deploy.local).withDispatcher(settings.Dispatcher), + name = managerName) + } + }) + } - import _config._ + /** + * INTERNAL API + * + * Use IO(DNS) or Dns(system). Do not instantiate directly + * + * For binary compat as DnsExt constructor didn't used to have internal API on + */ + @InternalApi + def this(system: ExtendedActorSystem) = this(system, system.settings.config.getString("akka.io.dns.resolver"), "IO-DNS") - val Dispatcher: String = getString("dispatcher") - val Resolver: String = getString("resolver") - val ResolverConfig: Config = getConfig(Resolver) + class Settings private[DnsExt] (config: Config, resolverName: String) { + /** + * Load the default resolver + */ + def this(config: Config) = this(config, config.getString("resolver")) + + val Dispatcher: String = config.getString("dispatcher") + val Resolver: String = resolverName + val ResolverConfig: Config = config.getConfig(Resolver) val ProviderObjectName: String = ResolverConfig.getString("provider-object") override def toString = s"Settings($Dispatcher, $Resolver, $ResolverConfig, $ProviderObjectName)" } + // Settings for the system resolver + val Settings: Settings = new Settings(system.settings.config.getConfig("akka.io.dns"), resolverName) + + // System DNS resolver val provider: DnsProvider = system.dynamicAccess.getClassFor[DnsProvider](Settings.ProviderObjectName).get.newInstance() + + // System DNS cache val cache: Dns = provider.cache + // System DNS manager val manager: ActorRef = { system.systemActorOf( props = Props(provider.managerClass, this).withDeploy(Deploy.local).withDispatcher(Settings.Dispatcher), - name = "IO-DNS") + name = managerName) } + // System DNS manager def getResolver: ActorRef = manager + } object IpVersionSelector { diff --git a/akka-actor/src/main/scala/akka/io/dns/internal/AsyncDnsManager.scala b/akka-actor/src/main/scala/akka/io/dns/internal/AsyncDnsManager.scala index c553f80828..723147921e 100644 --- a/akka-actor/src/main/scala/akka/io/dns/internal/AsyncDnsManager.scala +++ b/akka-actor/src/main/scala/akka/io/dns/internal/AsyncDnsManager.scala @@ -7,14 +7,15 @@ package akka.io.dns.internal import java.net.InetSocketAddress import java.util.concurrent.TimeUnit -import akka.actor.{ Actor, ActorLogging, ActorRefFactory, Deploy, Props, Timers } +import akka.actor.{ Actor, ActorLogging, ActorRefFactory, Deploy, ExtendedActorSystem, Props, Timers } import akka.annotation.InternalApi import akka.dispatch.{ RequiresMessageQueue, UnboundedMessageQueueSemantics } import akka.io.dns.{ AAAARecord, ARecord, DnsProtocol, DnsSettings } import akka.io.dns.internal.AsyncDnsManager.CacheCleanup -import akka.io.{ Dns, DnsExt, PeriodicCacheCleanup } +import akka.io.{ Dns, DnsExt, DnsProvider, PeriodicCacheCleanup } import akka.routing.FromConfig import akka.util.Timeout +import com.typesafe.config.Config import scala.concurrent.duration.Duration @@ -30,33 +31,36 @@ private[io] object AsyncDnsManager { * INTERNAL API */ @InternalApi -private[io] final class AsyncDnsManager(val ext: DnsExt) extends Actor +private[io] final class AsyncDnsManager(name: String, system: ExtendedActorSystem, resolverConfig: Config, cache: Dns, dispatcher: String, provider: DnsProvider) extends Actor with RequiresMessageQueue[UnboundedMessageQueueSemantics] with ActorLogging with Timers { import akka.pattern.ask import akka.pattern.pipe + /** + * Ctr expected by the DnsExt for all DnsMangers + */ + def this(ext: DnsExt) = this(ext.Settings.Resolver, ext.system, ext.Settings.ResolverConfig, ext.cache, ext.Settings.Dispatcher, ext.provider) + implicit val ec = context.dispatcher - private var oldProtocolWarningLoggedTimes = 0 - - val settings = new DnsSettings(ext.system, ext.Settings.ResolverConfig) + val settings = new DnsSettings(system, resolverConfig) implicit val timeout = Timeout(settings.ResolveTimeout) private val resolver = { - val props: Props = FromConfig.props(Props(ext.provider.actorClass, settings, ext.cache, (factory: ActorRefFactory, dns: List[InetSocketAddress]) ⇒ { + val props: Props = FromConfig.props(Props(provider.actorClass, settings, cache, (factory: ActorRefFactory, dns: List[InetSocketAddress]) ⇒ { dns.map(ns ⇒ factory.actorOf(Props(new DnsClient(ns)))) - }).withDeploy(Deploy.local).withDispatcher(ext.Settings.Dispatcher)) - context.actorOf(props, ext.Settings.Resolver) + }).withDeploy(Deploy.local).withDispatcher(dispatcher)) + context.actorOf(props, name) } - private val cacheCleanup = ext.cache match { + private val cacheCleanup = cache match { case cleanup: PeriodicCacheCleanup ⇒ Some(cleanup) case _ ⇒ None } override def preStart(): Unit = { cacheCleanup.foreach { _ ⇒ - val interval = Duration(ext.Settings.ResolverConfig.getDuration("cache-cleanup-interval", TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS) + val interval = Duration(resolverConfig.getDuration("cache-cleanup-interval", TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS) timers.startPeriodicTimer(CacheCleanup, CacheCleanup, interval) } } diff --git a/akka-bench-jmh/README.md b/akka-bench-jmh/README.md index a37cc73e31..c40f222d25 100644 --- a/akka-bench-jmh/README.md +++ b/akka-bench-jmh/README.md @@ -7,4 +7,4 @@ You can run them like: project akka-bench-jmh jmh:run -i 3 -wi 3 -f 1 .*ActorCreationBenchmark -Use 'jmh:run -h' to get an overview of the availabe options. +Use 'jmh:run -h' to get an overview of the available options. diff --git a/akka-discovery/src/main/resources/reference.conf b/akka-discovery/src/main/resources/reference.conf new file mode 100644 index 0000000000..176d8bd374 --- /dev/null +++ b/akka-discovery/src/main/resources/reference.conf @@ -0,0 +1,75 @@ +###################################################### +# Akka Discovery Config # +###################################################### + +akka.actor.deployment { + "/SD-DNS/async-dns" { + mailbox = "unbounded" + router = "round-robin-pool" + nr-of-instances = 1 + } +} + +akka.discovery { + + # Users MUST configure this value to set the default discovery method. + # + # The value can be an implementation config path name, such as "akka-dns", + # which would attempt to resolve as `akka.discovery.akka-dns` which is expected + # to contain a `class` setting. As fallback, the root `akka-dns` setting scope + # would be used. If none of those contained a `class` setting, then the value is + # assumed to be a class name, and an attempt is made to instantiate it. + method = "" + + # Config based service discovery + config { + class = akka.discovery.config.ConfigServiceDiscovery + + # Location of the services in configuration + services-path = "akka.discovery.config.services" + + # A map of services to resolve from configuration. + # See docs for more examples. + # A list of endpoints with host/port where port is optional e.g. + # services { + # service1 { + # endpoints = [ + # { + # host = "cat.com" + # port = 1233 + # }, + # { + # host = "dog.com" + # } + # ] + # }, + # service2 { + # endpoints = [ + # { + # host = "fish.com" + # port = 1233 + # } + # ] + # } + # } + services = { + + } + } + + # Aggregate multiple service discovery mechaisms + aggregate { + class = akka.discovery.aggregate.AggregateServiceDiscovery + + # List of service discovery methods to try in order. E.g DNS then fall back to config + # ["akka-dns" , "confg" ] + discovery-methods = [] + + } + + # DNS based service discovery + akka-dns { + class = akka.discovery.dns.DnsServiceDiscovery + } +} + diff --git a/akka-discovery/src/main/scala/akka/discovery/Discovery.scala b/akka-discovery/src/main/scala/akka/discovery/Discovery.scala new file mode 100644 index 0000000000..b47ab9d77f --- /dev/null +++ b/akka-discovery/src/main/scala/akka/discovery/Discovery.scala @@ -0,0 +1,119 @@ +/* + * Copyright (C) 2017-2018 Lightbend Inc. + */ + +package akka.discovery + +import java.util.concurrent.ConcurrentHashMap +import java.util.function.{ Function ⇒ JFunction } + +import akka.actor._ +import akka.annotation.InternalApi + +import scala.util.{ Failure, Success, Try } + +final class Discovery(implicit system: ExtendedActorSystem) extends Extension { + + Discovery.checkClassPathForOldDiscovery(system) + + private val implementations = new ConcurrentHashMap[String, ServiceDiscovery] + private val factory = new JFunction[String, ServiceDiscovery] { + override def apply(method: String): ServiceDiscovery = createServiceDiscovery(method) + } + + private lazy val _defaultImplMethod = + system.settings.config.getString("akka.discovery.method") match { + case "" ⇒ + throw new IllegalArgumentException( + "No default service discovery implementation configured in " + + "`akka.discovery.method`. Make sure to configure this setting to your preferred implementation such as " + + "'akka-dns' in your application.conf (from the akka-discovery module).") + case method ⇒ method + } + + private lazy val defaultImpl = loadServiceDiscovery(_defaultImplMethod) + + /** + * Default [[ServiceDiscovery]] as configured in `akka.discovery.method`. + */ + @throws[IllegalArgumentException] + def discovery: ServiceDiscovery = defaultImpl + + /** + * Create a [[ServiceDiscovery]] from configuration property. + * The given `method` parameter is used to find configuration property + * "akka.discovery.[method].class" or "[method].class". `method` can also + * be a fully class name. + * + * The `ServiceDiscovery` instance for a given `method` will be created + * once and subsequent requests for the same `method` will return the same instance. + */ + def loadServiceDiscovery(method: String): ServiceDiscovery = { + implementations.computeIfAbsent(method, factory) + } + + /** + * INTERNAL API + */ + @InternalApi + private def createServiceDiscovery(method: String): ServiceDiscovery = { + val config = system.settings.config + val dynamic = system.dynamicAccess + + def classNameFromConfig(path: String): String = + if (config.hasPath(path)) config.getString(path) + else throw new IllegalArgumentException(s"$path must contain field `class` that is a FQN of a `akka.discovery.ServiceDiscovery` implementation") + + def create(clazzName: String): Try[ServiceDiscovery] = { + dynamic + .createInstanceFor[ServiceDiscovery](clazzName, (classOf[ExtendedActorSystem] → system) :: Nil) + .recoverWith { + case _: ClassNotFoundException | _: NoSuchMethodException ⇒ + dynamic.createInstanceFor[ServiceDiscovery](clazzName, (classOf[ActorSystem] → system) :: Nil) + } + .recoverWith { + case _: ClassNotFoundException | _: NoSuchMethodException ⇒ + dynamic.createInstanceFor[ServiceDiscovery](clazzName, Nil) + } + } + + val configName = "akka.discovery." + method + ".class" + val instanceTry = create(classNameFromConfig(configName)) + + instanceTry match { + case Failure(e @ (_: ClassNotFoundException | _: NoSuchMethodException)) ⇒ + throw new IllegalArgumentException( + s"Illegal [$configName] value or incompatible class! " + + "The implementation class MUST extend akka.discovery.ServiceDiscovery and take an " + + "ExtendedActorSystem as constructor argument.", e) + case Failure(e) ⇒ throw e + case Success(instance) ⇒ instance + } + + } + +} + +object Discovery extends ExtensionId[Discovery] with ExtensionIdProvider { + override def apply(system: ActorSystem): Discovery = super.apply(system) + + override def lookup: Discovery.type = Discovery + + override def get(system: ActorSystem): Discovery = super.get(system) + + override def createExtension(system: ExtendedActorSystem): Discovery = new Discovery()(system) + + /** + * INTERNAL API + */ + @InternalApi + private[akka] def checkClassPathForOldDiscovery(system: ExtendedActorSystem): Unit = { + try { + system.dynamicAccess.getClassFor("akka.discovery.SimpleServiceDiscovery").get + throw new RuntimeException("Old version of Akka Discovery from Akka Management found on the classpath. Remove `com.lightbend.akka.discovery:akka-discovery` from the classpath..") + } catch { + case _: ClassNotFoundException ⇒ // all good + } + } + +} diff --git a/akka-discovery/src/main/scala/akka/discovery/ServiceDiscovery.scala b/akka-discovery/src/main/scala/akka/discovery/ServiceDiscovery.scala new file mode 100644 index 0000000000..eea947ee46 --- /dev/null +++ b/akka-discovery/src/main/scala/akka/discovery/ServiceDiscovery.scala @@ -0,0 +1,173 @@ +/* + * Copyright (C) 2017-2018 Lightbend Inc. + */ + +package akka.discovery + +import java.net.InetAddress +import java.util.Optional +import java.util.concurrent.CompletionStage +import java.util.concurrent.TimeUnit + +import akka.actor.DeadLetterSuppression +import akka.annotation.ApiMayChange + +import scala.collection.immutable +import scala.concurrent.Future +import scala.concurrent.duration.FiniteDuration +import scala.util.Try + +@ApiMayChange +object ServiceDiscovery { + + /** Result of a successful resolve request */ + final case class Resolved(serviceName: String, addresses: immutable.Seq[ResolvedTarget]) + extends DeadLetterSuppression { + + /** + * Java API + */ + def getAddresses: java.util.List[ResolvedTarget] = { + import scala.collection.JavaConverters._ + addresses.asJava + } + } + + object ResolvedTarget { + // Simply compare the bytes of the address. + // This may not work in exotic cases such as IPv4 addresses encoded as IPv6 addresses. + private implicit val inetAddressOrdering: Ordering[InetAddress] = + Ordering.by[InetAddress, Iterable[Byte]](_.getAddress) + + implicit val addressOrdering: Ordering[ResolvedTarget] = Ordering.by { t ⇒ + (t.address, t.host, t.port) + } + + def apply(host: String, port: Option[Int]): ResolvedTarget = + ResolvedTarget(host, port, Try(InetAddress.getByName(host)).toOption) + } + + /** + * Resolved target host, with optional port and the IP address. + * @param host the hostname or the IP address of the target + * @param port optional port number + * @param address optional IP address of the target. This is used during cluster bootstap when available. + */ + final case class ResolvedTarget( + host: String, + port: Option[Int], + address: Option[InetAddress] + ) { + + /** + * Java API + */ + def getPort: Optional[Int] = { + import scala.compat.java8.OptionConverters._ + port.asJava + } + + /** + * Java API + */ + def getAddress: Optional[InetAddress] = { + import scala.compat.java8.OptionConverters._ + address.asJava + } + } + +} + +/** + * A service lookup. It is up to each method to decide + * what to do with the optional portName and protocol fields. + * For example `portName` could be used to distinguish between + * Akka remoting ports and HTTP ports. + * + */ +@ApiMayChange +final case class Lookup(serviceName: String, portName: Option[String], protocol: Option[String]) { + + /** + * Which port for a service e.g. Akka remoting or HTTP. + * Maps to "service" for an SRV records. + */ + def withPortName(value: String): Lookup = copy(portName = Some(value)) + + /** + * Which protocol e.g. TCP or UDP. + * Maps to "protocol" for SRV records. + */ + def withProtocol(value: String): Lookup = copy(protocol = Some(value)) +} + +@ApiMayChange +case object Lookup { + + /** + * Create a service Lookup with only a serviceName. + * Use withPortName and withProtocol to provide optional portName + * and protocol + */ + def apply(serviceName: String): Lookup = new Lookup(serviceName, None, None) + + /** + * Java API + * + * Create a service Lookup with only a serviceName. + * Use withPortName and withProtocol to provide optional portName + * and protocol + */ + def create(serviceName: String): Lookup = new Lookup(serviceName, None, None) +} + +/** + * Implement to provide a service discovery method + * + */ +@ApiMayChange +abstract class ServiceDiscovery { + + import ServiceDiscovery._ + + /** + * Scala API: Perform lookup using underlying discovery implementation. + * + * @param lookup A service discovery lookup. + * @param resolveTimeout Timeout. Up to the discovery-method to adhere to his + */ + def lookup(lookup: Lookup, resolveTimeout: FiniteDuration): Future[Resolved] + + /** + * Scala API: Perform lookup using underlying discovery implementation. + * + * Convenience for when only a name is required. + */ + def lookup(serviceName: String, resolveTimeout: FiniteDuration): Future[Resolved] = + lookup(Lookup(serviceName), resolveTimeout) + + /** + * Java API: Perform basic lookup using underlying discovery implementation. + * + * While the implementation may provide other settings and ways to configure timeouts, + * the passed `resolveTimeout` should never be exceeded, as it signals the application's + * eagerness to wait for a result for this specific lookup. + * + * The returned future SHOULD be failed once resolveTimeout has passed. + * + */ + def lookup(query: Lookup, resolveTimeout: java.time.Duration): CompletionStage[Resolved] = { + import scala.compat.java8.FutureConverters._ + lookup(query, FiniteDuration(resolveTimeout.toMillis, TimeUnit.MILLISECONDS)).toJava + } + + /** + * Java API + * + * @param serviceName A name, see discovery-method's docs for how this is interpreted + * @param resolveTimeout Timeout. Up to the discovery-methodto adhere to his + */ + def lookup(serviceName: String, resolveTimeout: java.time.Duration): CompletionStage[Resolved] = + lookup(Lookup(serviceName), resolveTimeout) + +} diff --git a/akka-discovery/src/main/scala/akka/discovery/aggregate/AggregateServiceDiscovery.scala b/akka-discovery/src/main/scala/akka/discovery/aggregate/AggregateServiceDiscovery.scala new file mode 100644 index 0000000000..d26962d1a7 --- /dev/null +++ b/akka-discovery/src/main/scala/akka/discovery/aggregate/AggregateServiceDiscovery.scala @@ -0,0 +1,93 @@ +/* + * Copyright (C) 2017-2018 Lightbend Inc. + */ + +package akka.discovery.aggregate + +import akka.actor.ExtendedActorSystem +import akka.annotation.InternalApi +import akka.discovery.ServiceDiscovery.Resolved +import akka.discovery.aggregate.AggregateServiceDiscovery.Methods +import akka.discovery.{ Discovery, Lookup, ServiceDiscovery } +import akka.event.Logging +import akka.util.Helpers.Requiring +import com.typesafe.config.Config + +import scala.collection.JavaConverters._ +import scala.concurrent.Future +import scala.concurrent.duration.FiniteDuration +import scala.util.control.NonFatal + +/** + * INTERNAL API + */ +@InternalApi +private final class AggregateServiceDiscoverySettings(config: Config) { + + val discoveryMethods = config + .getStringList("discovery-methods") + .asScala + .toList + .requiring(_.nonEmpty, "At least one discovery method should be specified") + +} + +/** + * INTERNAL API + */ +@InternalApi +private object AggregateServiceDiscovery { + type Methods = List[(String, ServiceDiscovery)] +} + +/** + * INTERNAL API + */ +@InternalApi +private[akka] final class AggregateServiceDiscovery(system: ExtendedActorSystem) extends ServiceDiscovery { + + private val log = Logging(system, getClass) + + private val settings = + new AggregateServiceDiscoverySettings(system.settings.config.getConfig("akka.discovery.aggregate")) + + private val methods = { + val serviceDiscovery = Discovery(system) + settings.discoveryMethods.map(mech ⇒ (mech, serviceDiscovery.loadServiceDiscovery(mech))) + } + private implicit val ec = system.dispatcher + + /** + * Each discovery method is given the resolveTimeout rather than reducing it each time between methods. + */ + override def lookup(lookup: Lookup, resolveTimeout: FiniteDuration): Future[Resolved] = + resolve(methods, lookup, resolveTimeout) + + private def resolve(sds: Methods, query: Lookup, resolveTimeout: FiniteDuration): Future[Resolved] = { + sds match { + case (method, next) :: Nil ⇒ + log.debug("Looking up [{}] with [{}]", query, method) + next.lookup(query, resolveTimeout) + case (method, next) :: tail ⇒ + log.debug("Looking up [{}] with [{}]", query, method) + // If nothing comes back then try the next one + next + .lookup(query, resolveTimeout) + .flatMap { resolved ⇒ + if (resolved.addresses.isEmpty) { + log.debug("Method[{}] returned no ResolvedTargets, trying next", query) + resolve(tail, query, resolveTimeout) + } else + Future.successful(resolved) + } + .recoverWith { + case NonFatal(t) ⇒ + log.error(t, "[{}] Service discovery failed. Trying next discovery method", method) + resolve(tail, query, resolveTimeout) + } + case Nil ⇒ + // this is checked in `discoveryMethods`, but silence compiler warning + throw new IllegalStateException("At least one discovery method should be specified") + } + } +} diff --git a/akka-discovery/src/main/scala/akka/discovery/config/ConfigServiceDiscovery.scala b/akka-discovery/src/main/scala/akka/discovery/config/ConfigServiceDiscovery.scala new file mode 100644 index 0000000000..5ddd066cfe --- /dev/null +++ b/akka-discovery/src/main/scala/akka/discovery/config/ConfigServiceDiscovery.scala @@ -0,0 +1,65 @@ +/* + * Copyright (C) 2017-2018 Lightbend Inc. + */ + +package akka.discovery.config + +import akka.actor.ExtendedActorSystem +import akka.annotation.InternalApi +import akka.discovery.{ Lookup, ServiceDiscovery } +import akka.discovery.ServiceDiscovery.{ Resolved, ResolvedTarget } +import akka.event.Logging +import com.typesafe.config.Config + +import scala.collection.JavaConverters._ +import scala.collection.{ breakOut, immutable } +import scala.concurrent.Future +import scala.concurrent.duration.FiniteDuration + +/** + * INTERNAL API + */ +@InternalApi +private object ConfigServicesParser { + def parse(config: Config): Map[String, Resolved] = { + val byService = config + .root() + .entrySet() + .asScala + .map { en ⇒ + (en.getKey, config.getConfig(en.getKey)) + } + .toMap + + byService.map { + case (serviceName, full) ⇒ + val endpoints = full.getConfigList("endpoints").asScala.toList + val resolvedTargets = endpoints.map { c ⇒ + val host = c.getString("host") + val port = if (c.hasPath("port")) Some(c.getInt("port")) else None + ResolvedTarget(host = host, port = port, address = None) + } + (serviceName, Resolved(serviceName, resolvedTargets)) + } + } +} + +/** + * INTERNAL API + */ +@InternalApi +private[akka] class ConfigServiceDiscovery(system: ExtendedActorSystem) extends ServiceDiscovery { + + private val log = Logging(system, getClass) + + private val resolvedServices = ConfigServicesParser.parse( + system.settings.config.getConfig(system.settings.config.getString("akka.discovery.config.services-path")) + ) + + log.debug("Config discovery serving: {}", resolvedServices) + + override def lookup(lookup: Lookup, resolveTimeout: FiniteDuration): Future[Resolved] = { + Future.successful(resolvedServices.getOrElse(lookup.serviceName, Resolved(lookup.serviceName, Nil))) + } + +} diff --git a/akka-discovery/src/main/scala/akka/discovery/dns/DnsServiceDiscovery.scala b/akka-discovery/src/main/scala/akka/discovery/dns/DnsServiceDiscovery.scala new file mode 100644 index 0000000000..c1003b4887 --- /dev/null +++ b/akka-discovery/src/main/scala/akka/discovery/dns/DnsServiceDiscovery.scala @@ -0,0 +1,111 @@ +/* + * Copyright (C) 2017-2018 Lightbend Inc. + */ + +package akka.discovery.dns + +import java.net.InetAddress + +import akka.actor.ExtendedActorSystem +import akka.annotation.InternalApi +import akka.discovery.ServiceDiscovery.{ Resolved, ResolvedTarget } +import akka.event.Logging +import akka.io.{ Dns, IO } +import akka.pattern.ask + +import scala.concurrent.Future +import scala.concurrent.duration.FiniteDuration +import akka.discovery._ +import akka.io.dns.DnsProtocol.{ Ip, Srv } +import akka.io.dns.{ AAAARecord, ARecord, DnsProtocol, SRVRecord } + +import scala.collection.{ immutable ⇒ im } + +/** + * INTERNAL API + */ +@InternalApi +private object DnsServiceDiscovery { + def srvRecordsToResolved(srvRequest: String, resolved: DnsProtocol.Resolved): Resolved = { + val ips: Map[String, im.Seq[InetAddress]] = + resolved.additionalRecords.foldLeft(Map.empty[String, im.Seq[InetAddress]]) { + case (acc, a: ARecord) ⇒ + acc.updated(a.name, a.ip +: acc.getOrElse(a.name, Nil)) + case (acc, a: AAAARecord) ⇒ + acc.updated(a.name, a.ip +: acc.getOrElse(a.name, Nil)) + case (acc, _) ⇒ + acc + } + + val addresses = resolved.records.flatMap { + case srv: SRVRecord ⇒ + val addresses = ips.getOrElse(srv.target, Nil).map(ip ⇒ ResolvedTarget(srv.target, Some(srv.port), Some(ip))) + if (addresses.isEmpty) { + im.Seq(ResolvedTarget(srv.target, Some(srv.port), None)) + } else { + addresses + } + case other ⇒ im.Seq.empty[ResolvedTarget] + } + + Resolved(srvRequest, addresses) + } + +} + +/** + * INTERNAL API + */ +@InternalApi +private[akka] class DnsServiceDiscovery(system: ExtendedActorSystem) extends ServiceDiscovery { + + import DnsServiceDiscovery._ + + import ServiceDiscovery._ + + private val log = Logging(system, getClass) + private val dns = if (system.settings.config.getString("akka.io.dns.resolver") == "async-dns") { + log.debug("using system resolver as it is set to async-dns") + IO(Dns)(system) + } else { + log.debug("system resolver is not async-dns. Loading isolated resolver") + Dns(system).loadAsyncDns("SD-DNS") + } + + import system.dispatcher + + private def cleanIpString(ipString: String): String = + if (ipString.startsWith("/")) ipString.tail else ipString + + override def lookup(lookup: Lookup, resolveTimeout: FiniteDuration): Future[Resolved] = { + lookup match { + case Lookup(name, Some(portName), Some(protocol)) ⇒ + val srvRequest = s"_$portName._$protocol.$name" + log.debug("Lookup [{}] translated to SRV query [{}] as contains portName and protocol", lookup, srvRequest) + dns.ask(DnsProtocol.Resolve(srvRequest, Srv))(resolveTimeout).map { + case resolved: DnsProtocol.Resolved ⇒ + log.debug("Resolved Dns.Resolved: {}", resolved) + srvRecordsToResolved(srvRequest, resolved) + case resolved ⇒ + log.warning("Resolved UNEXPECTED (resolving to Nil): {}", resolved.getClass) + Resolved(srvRequest, Nil) + } + case _ ⇒ + log.debug("Lookup[{}] translated to A/AAAA lookup as does not have portName and protocol", lookup) + dns.ask(DnsProtocol.Resolve(lookup.serviceName, Ip()))(resolveTimeout).map { + case resolved: DnsProtocol.Resolved ⇒ + log.debug("Resolved Dns.Resolved: {}", resolved) + val addresses = resolved.records.collect { + case a: ARecord ⇒ ResolvedTarget(cleanIpString(a.ip.getHostAddress), None, Some(a.ip)) + case a: AAAARecord ⇒ ResolvedTarget(cleanIpString(a.ip.getHostAddress), None, Some(a.ip)) + } + Resolved(lookup.serviceName, addresses) + case resolved ⇒ + log.warning("Resolved UNEXPECTED (resolving to Nil): {}", resolved.getClass) + Resolved(lookup.serviceName, Nil) + + } + } + } + +} diff --git a/akka-discovery/src/test/java/jdoc/akka/discovery/CompileOnlyTest.java b/akka-discovery/src/test/java/jdoc/akka/discovery/CompileOnlyTest.java new file mode 100644 index 0000000000..736b046b44 --- /dev/null +++ b/akka-discovery/src/test/java/jdoc/akka/discovery/CompileOnlyTest.java @@ -0,0 +1,36 @@ +/* + * Copyright (C) 2017-2018 Lightbend Inc. + */ + +package jdoc.akka.discovery; + +import akka.actor.ActorSystem; +import akka.discovery.Lookup; +import akka.discovery.Discovery; +import akka.discovery.ServiceDiscovery; + +import java.time.Duration; +import java.util.concurrent.CompletionStage; + +public class CompileOnlyTest { + public static void example() { + //#loading + ActorSystem as = ActorSystem.create(); + ServiceDiscovery serviceDiscovery = Discovery.get(as).discovery(); + //#loading + + //#basic + serviceDiscovery.lookup(Lookup.create("akka.io"), Duration.ofSeconds(1)); + // convenience for a Lookup with only a serviceName + serviceDiscovery.lookup("akka.io", Duration.ofSeconds(1)); + //#basic + + //#full + CompletionStage lookup = serviceDiscovery.lookup(Lookup.create("akka.io").withPortName("remoting").withProtocol("tcp"), Duration.ofSeconds(1)); + //#full + + // not-used warning + lookup.thenAccept(System.out::println); + + } +} diff --git a/akka-discovery/src/test/scala/akka/discovery/DiscoveryConfigurationSpec.scala b/akka-discovery/src/test/scala/akka/discovery/DiscoveryConfigurationSpec.scala new file mode 100644 index 0000000000..f750870130 --- /dev/null +++ b/akka-discovery/src/test/scala/akka/discovery/DiscoveryConfigurationSpec.scala @@ -0,0 +1,144 @@ +/* + * Copyright (C) 2017-2018 Lightbend Inc. + */ + +package akka.discovery + +import scala.concurrent.Future +import scala.concurrent.duration.FiniteDuration +import akka.actor.ActorSystem +import akka.discovery.ServiceDiscovery.Resolved +import akka.testkit.TestKit +import com.typesafe.config.ConfigFactory +import org.scalatest.Matchers +import org.scalatest.WordSpec + +class DiscoveryConfigurationSpec extends WordSpec with Matchers { + + "ServiceDiscovery" should { + "throw when no default discovery configured" in { + val sys = ActorSystem("DiscoveryConfigurationSpec") + try { + val ex = intercept[Exception] { + Discovery(sys).discovery + } + ex.getMessage should include("No default service discovery implementation configured") + } finally TestKit.shutdownActorSystem(sys) + } + + "select implementation from config by config name (inside akka.discovery namespace)" in { + val className = classOf[FakeTestDiscovery].getCanonicalName + + val sys = ActorSystem("DiscoveryConfigurationSpec", ConfigFactory.parseString(s""" + akka.discovery { + method = akka-mock-inside + + akka-mock-inside { + class = $className + } + } + """).withFallback(ConfigFactory.load())) + + try Discovery(sys).discovery.getClass.getCanonicalName should ===(className) + finally TestKit.shutdownActorSystem(sys) + } + + "load another implementation from config by config name" in { + val className1 = classOf[FakeTestDiscovery].getCanonicalName + val className2 = classOf[FakeTestDiscovery2].getCanonicalName + + val sys = ActorSystem("DiscoveryConfigurationSpec", ConfigFactory.parseString(s""" + akka.discovery { + method = mock1 + + mock1 { + class = $className1 + } + mock2 { + class = $className2 + } + } + """).withFallback(ConfigFactory.load())) + + try { + Discovery(sys).discovery.getClass.getCanonicalName should ===(className1) + Discovery(sys).loadServiceDiscovery("mock2").getClass.getCanonicalName should ===(className2) + } finally TestKit.shutdownActorSystem(sys) + } + + "return same instance for same method" in { + val className1 = classOf[FakeTestDiscovery].getCanonicalName + val className2 = classOf[FakeTestDiscovery2].getCanonicalName + + val sys = ActorSystem("DiscoveryConfigurationSpec", ConfigFactory.parseString(s""" + akka.discovery { + method = mock1 + + mock1 { + class = $className1 + } + mock2 { + class = $className2 + } + } + """).withFallback(ConfigFactory.load())) + + try { + Discovery(sys).loadServiceDiscovery("mock2") should be theSameInstanceAs Discovery(sys) + .loadServiceDiscovery("mock2") + + Discovery(sys).discovery should be theSameInstanceAs Discovery(sys).loadServiceDiscovery("mock1") + } finally TestKit.shutdownActorSystem(sys) + } + + "throw a specific discovery method exception" in { + val className = classOf[ExceptionThrowingDiscovery].getCanonicalName + + val sys = ActorSystem("DiscoveryConfigurationSpec", ConfigFactory.parseString(s""" + akka.discovery { + method = "mock1" + mock1 { + class = $className + } + } + """).withFallback(ConfigFactory.load())) + + try { + an[DiscoveryException] should be thrownBy Discovery(sys).discovery + } finally TestKit.shutdownActorSystem(sys) + } + + "throw an illegal argument exception for not existing method" in { + val className = "className" + + val sys = ActorSystem("DiscoveryConfigurationSpec", ConfigFactory.parseString(s""" + akka.discovery { + method = "$className" + } + """).withFallback(ConfigFactory.load())) + + try { + an[IllegalArgumentException] should be thrownBy Discovery(sys).discovery + } finally TestKit.shutdownActorSystem(sys) + } + + } + +} + +class FakeTestDiscovery extends ServiceDiscovery { + + override def lookup(lookup: Lookup, resolveTimeout: FiniteDuration): Future[Resolved] = ??? +} + +class FakeTestDiscovery2 extends FakeTestDiscovery + +class DiscoveryException(message: String) extends Exception + +class ExceptionThrowingDiscovery extends ServiceDiscovery { + + def lookup(lookup: Lookup, resolveTimeout: FiniteDuration): Future[Resolved] = ??? + + throw new DiscoveryException("Test Exception") + +} diff --git a/akka-discovery/src/test/scala/akka/discovery/aggregate/AggregateServiceDiscoverySpec.scala b/akka-discovery/src/test/scala/akka/discovery/aggregate/AggregateServiceDiscoverySpec.scala new file mode 100644 index 0000000000..96969462b7 --- /dev/null +++ b/akka-discovery/src/test/scala/akka/discovery/aggregate/AggregateServiceDiscoverySpec.scala @@ -0,0 +1,122 @@ +/* + * Copyright (C) 2017-2018 Lightbend Inc. + */ + +package akka.discovery.aggregate + +import akka.actor.{ ActorSystem, ExtendedActorSystem } +import akka.discovery.ServiceDiscovery.{ Resolved, ResolvedTarget } +import akka.discovery.{ Lookup, Discovery, ServiceDiscovery } +import akka.testkit.TestKit +import com.typesafe.config.{ Config, ConfigFactory } +import org.scalatest.concurrent.ScalaFutures +import org.scalatest.{ BeforeAndAfterAll, Matchers, WordSpecLike } + +import scala.concurrent.Future +import scala.concurrent.duration._ +import scala.collection.immutable + +class StubbedServiceDiscovery(system: ExtendedActorSystem) extends ServiceDiscovery { + + override def lookup(query: Lookup, resolveTimeout: FiniteDuration): Future[Resolved] = { + if (query.serviceName == "stubbed") { + Future.successful(Resolved( + query.serviceName, + immutable.Seq( + ResolvedTarget(host = "stubbed1", port = Some(1234), address = None) + ))) + } else if (query.serviceName == "fail") { + Future.failed(new RuntimeException("No resolving for you!")) + } else { + Future.successful(Resolved(query.serviceName, immutable.Seq.empty)) + } + } +} + +object AggregateServiceDiscoverySpec { + val config: Config = ConfigFactory.parseString(""" + akka { + loglevel = DEBUG + discovery { + method = aggregate + + aggregate { + discovery-methods = ["stubbed1", "config"] + } + } + } + + akka.discovery.stubbed1 { + class = akka.discovery.aggregate.StubbedServiceDiscovery + } + + akka.discovery.config.services = { + config1 = { + endpoints = [ + { + host = "cat" + port = 1233 + }, + { + host = "dog" + port = 1234 + } + ] + }, + fail = { + endpoints = [ + { + host = "from-config" + } + ] + } + } + """) +} + +class AggregateServiceDiscoverySpec + extends TestKit(ActorSystem("AggregateDiscoverySpec", AggregateServiceDiscoverySpec.config)) + with WordSpecLike + with Matchers + with BeforeAndAfterAll + with ScalaFutures { + + override protected def afterAll(): Unit = { + TestKit.shutdownActorSystem(system) + } + + val discovery: ServiceDiscovery = Discovery(system).discovery + + "Aggregate service discovery" must { + + "only call first one if returns results" in { + val results = discovery.lookup("stubbed", 100.millis).futureValue + results shouldEqual Resolved( + "stubbed", + immutable.Seq( + ResolvedTarget(host = "stubbed1", port = Some(1234), address = None) + )) + } + + "move onto the next if no resolved targets" in { + val results = discovery.lookup("config1", 100.millis).futureValue + results shouldEqual Resolved( + "config1", + immutable.Seq( + ResolvedTarget(host = "cat", port = Some(1233), address = None), + ResolvedTarget(host = "dog", port = Some(1234), address = None) + )) + } + + "move onto next if fails" in { + val results = discovery.lookup("fail", 100.millis).futureValue + // Stub fails then result comes from config + results shouldEqual Resolved( + "fail", + immutable.Seq( + ResolvedTarget(host = "from-config", port = None, address = None) + )) + } + } + +} diff --git a/akka-discovery/src/test/scala/akka/discovery/config/ConfigServiceDiscoverySpec.scala b/akka-discovery/src/test/scala/akka/discovery/config/ConfigServiceDiscoverySpec.scala new file mode 100644 index 0000000000..f546fa59e8 --- /dev/null +++ b/akka-discovery/src/test/scala/akka/discovery/config/ConfigServiceDiscoverySpec.scala @@ -0,0 +1,78 @@ +/* + * Copyright (C) 2017-2018 Lightbend Inc. + */ + +package akka.discovery.config + +import akka.actor.ActorSystem +import akka.discovery.Discovery +import akka.discovery.ServiceDiscovery.ResolvedTarget +import akka.testkit.TestKit +import com.typesafe.config.{ Config, ConfigFactory } +import org.scalatest.concurrent.ScalaFutures +import org.scalatest.{ BeforeAndAfterAll, Matchers, WordSpecLike } + +import scala.concurrent.duration._ +import scala.collection.immutable + +object ConfigServiceDiscoverySpec { + + val config: Config = ConfigFactory.parseString(""" +akka { + loglevel = DEBUG + discovery { + method = config + config { + services = { + service1 = { + endpoints = [ + { + host = "cat" + port = 1233 + }, + { + host = "dog" + } + ] + }, + service2 = { + endpoints = [] + } + } + } + } +} + """) + +} + +class ConfigServiceDiscoverySpec + extends TestKit(ActorSystem("ConfigDiscoverySpec", ConfigServiceDiscoverySpec.config)) + with WordSpecLike + with Matchers + with BeforeAndAfterAll + with ScalaFutures { + + override protected def afterAll(): Unit = { + TestKit.shutdownActorSystem(system) + } + + val discovery = Discovery(system).discovery + + "Config discovery" must { + "load from config" in { + val result = discovery.lookup("service1", 100.millis).futureValue + result.serviceName shouldEqual "service1" + result.addresses shouldEqual immutable.Seq( + ResolvedTarget(host = "cat", port = Some(1233), address = None), + ResolvedTarget(host = "dog", port = None, address = None) + ) + } + + "return no resolved targets if not in config" in { + val result = discovery.lookup("dontexist", 100.millis).futureValue + result.serviceName shouldEqual "dontexist" + result.addresses shouldEqual immutable.Seq.empty + } + } +} diff --git a/akka-discovery/src/test/scala/akka/discovery/config/ConfigServicesParserSpec.scala b/akka-discovery/src/test/scala/akka/discovery/config/ConfigServicesParserSpec.scala new file mode 100644 index 0000000000..3786428cbe --- /dev/null +++ b/akka-discovery/src/test/scala/akka/discovery/config/ConfigServicesParserSpec.scala @@ -0,0 +1,52 @@ +/* + * Copyright (C) 2017-2018 Lightbend Inc. + */ + +package akka.discovery.config + +import akka.discovery.ServiceDiscovery.{ Resolved, ResolvedTarget } +import akka.discovery.config.ConfigServicesParserSpec._ +import com.typesafe.config.{ Config, ConfigFactory } +import org.scalatest.{ Matchers, WordSpec } + +import scala.collection.immutable + +object ConfigServicesParserSpec { + val exampleConfig: Config = ConfigFactory.parseString(""" + services { + service1 { + endpoints = [ + { + host = "cat" + port = 1233 + }, + { + host = "dog" + } + ] + }, + service2 { + endpoints = [] + } + } + """.stripMargin) +} + +class ConfigServicesParserSpec extends WordSpec with Matchers { + + "Config parsing" must { + "parse" in { + val config = exampleConfig.getConfig("services") + + val result = ConfigServicesParser.parse(config) + + result("service1") shouldEqual Resolved( + "service1", + immutable.Seq( + ResolvedTarget(host = "cat", port = Some(1233), address = None), + ResolvedTarget(host = "dog", port = None, address = None) + )) + result("service2") shouldEqual Resolved("service2", immutable.Seq()) + } + } +} diff --git a/akka-discovery/src/test/scala/akka/discovery/dns/DnsDiscoverySpec.scala b/akka-discovery/src/test/scala/akka/discovery/dns/DnsDiscoverySpec.scala new file mode 100644 index 0000000000..636757555d --- /dev/null +++ b/akka-discovery/src/test/scala/akka/discovery/dns/DnsDiscoverySpec.scala @@ -0,0 +1,119 @@ +/* + * Copyright (C) 2018 Lightbend Inc. + */ + +package akka.discovery.dns + +import java.net.InetAddress + +import akka.actor.ActorSystem +import akka.discovery.{ Discovery, Lookup } +import akka.discovery.ServiceDiscovery.ResolvedTarget +import akka.io.dns.DockerBindDnsService +import akka.testkit.{ AkkaSpec, SocketUtil, TestKit } +import com.typesafe.config.ConfigFactory + +import scala.concurrent.duration._ + +object DnsDiscoverySpec { + + val config = ConfigFactory.parseString( + s""" + //#configure-dns + akka { + discovery { + method = akka-dns + } + } + //#configure-dns + akka { + loglevel = DEBUG + } + akka.io.dns.async-dns.nameservers = ["localhost:${DnsDiscoverySpec.dockerDnsServerPort}"] + """) + + lazy val dockerDnsServerPort = SocketUtil.temporaryLocalPort() + + val configWithAsyncDnsResolverAsDefault = ConfigFactory.parseString( + """ + akka.io.dns.resolver = "async-dns" + """).withFallback(config) + +} + +class DnsDiscoverySpec extends AkkaSpec(DnsDiscoverySpec.config) + with DockerBindDnsService { + + import DnsDiscoverySpec._ + + override val hostPort: Int = DnsDiscoverySpec.dockerDnsServerPort + + val systemWithAsyncDnsAsResolver = ActorSystem("AsyncDnsSystem", configWithAsyncDnsResolverAsDefault) + + "Dns Discovery with isolated resolver" must { + + if (!dockerAvailable()) { + system.log.error("Test not run as docker is not available") + pending + } + + "work with SRV records" in { + val discovery = Discovery(system).discovery + val name = "_service._tcp.foo.test." + val result = + discovery + .lookup(Lookup("foo.test.").withPortName("service").withProtocol("tcp"), resolveTimeout = 10.seconds) + .futureValue + result.addresses.toSet shouldEqual Set( + ResolvedTarget("a-single.foo.test", Some(5060), Some(InetAddress.getByName("192.168.1.20"))), + ResolvedTarget("a-double.foo.test", Some(65535), Some(InetAddress.getByName("192.168.1.21"))), + ResolvedTarget("a-double.foo.test", Some(65535), Some(InetAddress.getByName("192.168.1.22"))) + ) + result.serviceName shouldEqual name + } + + "work with IP records" in { + val discovery = Discovery(system).discovery + val name = "a-single.foo.test" + val result = discovery.lookup(name, resolveTimeout = 500.milliseconds).futureValue + result.serviceName shouldEqual name + result.addresses.toSet shouldEqual Set( + ResolvedTarget("192.168.1.20", None) + ) + } + + "be using its own resolver" in { + // future will fail if it it doesn't exist + system.actorSelection("/system/SD-DNS/async-dns").resolveOne(2.seconds).futureValue + } + + } + + "Dns discovery with the system resolver" must { + "work with SRV records" in { + val discovery = Discovery(systemWithAsyncDnsAsResolver).discovery + val name = "_service._tcp.foo.test." + val result = + discovery + .lookup(Lookup("foo.test.").withPortName("service").withProtocol("tcp"), resolveTimeout = 10.seconds) + .futureValue + result.addresses.toSet shouldEqual Set( + ResolvedTarget("a-single.foo.test", Some(5060), Some(InetAddress.getByName("192.168.1.20"))), + ResolvedTarget("a-double.foo.test", Some(65535), Some(InetAddress.getByName("192.168.1.21"))), + ResolvedTarget("a-double.foo.test", Some(65535), Some(InetAddress.getByName("192.168.1.22"))) + ) + result.serviceName shouldEqual name + } + + "be using the system resolver" in { + // check the service discovery one doesn't exist + systemWithAsyncDnsAsResolver.actorSelection("/system/SD-DNS/async-dns").resolveOne(2.seconds).failed.futureValue + } + + } + + override def afterTermination(): Unit = { + super.afterTermination() + TestKit.shutdownActorSystem(system) + } +} diff --git a/akka-discovery/src/test/scala/akka/discovery/dns/DnsServiceDiscoverySpec.scala b/akka-discovery/src/test/scala/akka/discovery/dns/DnsServiceDiscoverySpec.scala new file mode 100644 index 0000000000..ce7a9d5e6a --- /dev/null +++ b/akka-discovery/src/test/scala/akka/discovery/dns/DnsServiceDiscoverySpec.scala @@ -0,0 +1,67 @@ +/* + * Copyright (C) 2017-2018 Lightbend Inc. + */ + +package akka.discovery.dns + +import java.net.{ Inet6Address, InetAddress } + +import akka.discovery.ServiceDiscovery +import akka.discovery.ServiceDiscovery.{ Resolved, ResolvedTarget } +import akka.io.dns.CachePolicy.Ttl +import akka.io.dns.{ AAAARecord, ARecord, DnsProtocol, SRVRecord } +import org.scalatest.{ Matchers, WordSpec } + +import scala.collection.{ immutable ⇒ im } +import scala.concurrent.duration._ + +class DnsServiceDiscoverySpec extends WordSpec with Matchers { + "srvRecordsToResolved" must { + "fill in ips from A records" in { + val resolved = DnsProtocol.Resolved("cats.com", im.Seq(new SRVRecord("cats.com", Ttl.fromPositive(1.second), 2, 3, 4, "kittens.com")), + im.Seq( + new ARecord("kittens.com", Ttl.fromPositive(1.second), InetAddress.getByName("127.0.0.2")), + new ARecord("kittens.com", Ttl.fromPositive(1.second), InetAddress.getByName("127.0.0.3")), + new ARecord("donkeys.com", Ttl.fromPositive(1.second), InetAddress.getByName("127.0.0.4")) + )) + + val result: ServiceDiscovery.Resolved = + DnsServiceDiscovery.srvRecordsToResolved("cats.com", resolved) + + result.serviceName shouldEqual "cats.com" + result.addresses.toSet shouldEqual Set( + ResolvedTarget("kittens.com", Some(4), Some(InetAddress.getByName("127.0.0.2"))), + ResolvedTarget("kittens.com", Some(4), Some(InetAddress.getByName("127.0.0.3"))) + ) + } + + // Naughty DNS server + "use SRV target and port if no additional records" in { + val resolved = DnsProtocol.Resolved("cats.com", im.Seq(new SRVRecord("cats.com", Ttl.fromPositive(1.second), 2, 3, 8080, "kittens.com")), + im.Seq(new ARecord("donkeys.com", Ttl.fromPositive(1.second), InetAddress.getByName("127.0.0.4")))) + + val result = + DnsServiceDiscovery.srvRecordsToResolved("cats.com", resolved) + + result shouldEqual Resolved("cats.com", im.Seq(ResolvedTarget("kittens.com", Some(8080), None))) + } + + "fill in ips from AAAA records" in { + val resolved = DnsProtocol.Resolved("cats.com", im.Seq(new SRVRecord("cats1.com", Ttl.fromPositive(1.second), 2, 3, 4, "kittens.com")), + im.Seq( + new AAAARecord("kittens.com", Ttl.fromPositive(2.seconds), InetAddress.getByName("::1").asInstanceOf[Inet6Address]), + new AAAARecord("kittens.com", Ttl.fromPositive(2.seconds), InetAddress.getByName("::2").asInstanceOf[Inet6Address]), + new AAAARecord("donkeys.com", Ttl.fromPositive(2.seconds), InetAddress.getByName("::3").asInstanceOf[Inet6Address]) + )) + + val result: ServiceDiscovery.Resolved = + DnsServiceDiscovery.srvRecordsToResolved("cats.com", resolved) + + result.serviceName shouldEqual "cats.com" + result.addresses.toSet shouldEqual Set( + ResolvedTarget("kittens.com", Some(4), Some(InetAddress.getByName("::1"))), + ResolvedTarget("kittens.com", Some(4), Some(InetAddress.getByName("::2"))) + ) + } + } +} diff --git a/akka-discovery/src/test/scala/doc/akka/discovery/CompileOnlySpec.scala b/akka-discovery/src/test/scala/doc/akka/discovery/CompileOnlySpec.scala new file mode 100644 index 0000000000..3aa2796f36 --- /dev/null +++ b/akka-discovery/src/test/scala/doc/akka/discovery/CompileOnlySpec.scala @@ -0,0 +1,34 @@ +/* + * Copyright (C) 2017-2018 Lightbend Inc. + */ + +package doc.akka.discovery + +import akka.actor.ActorSystem +import akka.discovery.{ Discovery, Lookup, ServiceDiscovery } + +import scala.concurrent.Future +import scala.concurrent.duration._ +import scala.concurrent.ExecutionContext.Implicits.global + +object CompileOnlySpec { + + //#loading + val system = ActorSystem() + val serviceDiscovery = Discovery(system).discovery + //#loading + + //#basic + serviceDiscovery.lookup(Lookup("akka.io"), 1.second) + // Convenience for a Lookup with only a serviceName + serviceDiscovery.lookup("akka.io", 1.second) + //#basic + + //#full + val lookup: Future[ServiceDiscovery.Resolved] = serviceDiscovery.lookup(Lookup("akka.io").withPortName("remoting").withProtocol("tcp"), 1.second) + //#full + + // compiler + lookup.foreach(println) + +} diff --git a/akka-docs/src/main/paradox/discovery/index.md b/akka-docs/src/main/paradox/discovery/index.md new file mode 100644 index 0000000000..e45b8e4b24 --- /dev/null +++ b/akka-docs/src/main/paradox/discovery/index.md @@ -0,0 +1,245 @@ +# Discovery + +@@@ warning + +This module is currently marked as @ref:[may change](../common/may-change.md) +This means that API or semantics can change without warning or deprecation period. + +@@@ + +Akka Discovery provides an interface around various ways of locating services. The built in methods are: + +* Configuration +* DNS +* Aggregate + +In addition [Akka Management](https://developer.lightbend.com/docs/akka-management/current/) contains methods such as: + +* Kubernetes API +* AWS +* Consul +* Marathon API + +Discovery used to be part of Akka Management but has become an Akka module as of `2.5.19` of Akka and version `0.21.0` +of Akka Management. If you're also using Akka Management for other service discovery methods or bootstrap make +sure you are using at least version `0.21.0`. + +## Dependency + +@@dependency[sbt,Gradle,Maven] { + group="com.typesafe.akka" + artifact="akka-discovery_$scala.binary_version$" + version="$akka.version$" +} + +## How it works + +Loading the extension: + +Scala +: @@snip [CompileOnlySpec.scala](/akka-discovery/src/test/scala/doc/akka/discovery/CompileOnlySpec.scala) { #loading } + +Java +: @@snip [CompileOnlyTest.java](/akka-discovery/src/test/java/jdoc/akka/discovery/CompileOnlyTest.java) { #loading } + +A `Lookup` contains a mandatory `serviceName` and an optional `portName` and `protocol`. How these are interpreted is discovery +method dependent e.g.DNS does an A/AAAA record query if any of the fields are missing and an SRV query for a full look up: + +Scala +: @@snip [CompileOnlySpec.scala](/akka-discovery/src/test/scala/doc/akka/discovery/CompileOnlySpec.scala) { #basic } + +Java +: @@snip [CompileOnlyTest.java](/akka-discovery/src/test/java/jdoc/akka/discovery/CompileOnlyTest.java) { #basic } + + +`portName` and `protocol` are optional and their meaning is interpreted by the method. + +Scala +: @@snip [CompileOnlySpec.scala](/akka-discovery/src/test/scala/doc/akka/discovery/CompileOnlySpec.scala) { #full } + +Java +: @@snip [CompileOnlyTest.java](/akka-discovery/src/test/java/jdoc/akka/discovery/CompileOnlyTest.java) { #full } + +Port can be used when a service opens multiple ports e.g. a HTTP port and an Akka remoting port. + +## Discovery Method: DNS + +DNS discovery maps `Lookup` queries as follows: + +* `serviceName`, `portName` and `protocol` set: SRV query in the form: `_port._protocol._name` Where the `_`s are added. +* Any query missing any of the fields is mapped to a A/AAAA query for the `serviceName` + +The mapping between Akka service discovery terminology and SRV terminology: + +* SRV service = port +* SRV name = serviceName +* SRV protocol = protocol + +Configure `akka-dns` to be used as the discovery implementation in your `application.conf`: + +@@snip[application.conf](/akka-discovery/src/test/scala/akka/discovery/dns/DnsDiscoverySpec.scala){ #configure-dns } + +From there on, you can use the generic API that hides the fact which discovery method is being used by calling:: + +Scala +: ```scala + import akka.discovery.ServiceDiscovery + val system = ActorSystem("Example") + // ... + val discovery = ServiceDiscovery(system).discovery + val result: Future[Resolved] = discovery.lookup("service-name", resolveTimeout = 500 milliseconds) + ``` + +Java +: ```java + import akka.discovery.ServiceDiscovery; + ActorSystem system = ActorSystem.create("Example"); + // ... + SimpleServiceDiscovery discovery = ServiceDiscovery.get(system).discovery(); + Future result = discovery.lookup("service-name", Duration.create("500 millis")); + ``` + +### How it works + +DNS discovery will use either A/AAAA records or SRV records depending on whether a `Simple` or `Full` lookup is issued.. +The advantage of SRV records is that they can include a port. + +#### SRV records + +Lookups with all the fields set become SRV queries. For example: + +``` +dig srv _service._tcp.akka.test + +; <<>> DiG 9.11.3-RedHat-9.11.3-6.fc28 <<>> srv service.tcp.akka.test +;; global options: +cmd +;; Got answer: +;; ->>HEADER<<- opcode: QUERY, status: NOERROR, id: 60023 +;; flags: qr aa rd ra; QUERY: 1, ANSWER: 2, AUTHORITY: 1, ADDITIONAL: 5 + +;; OPT PSEUDOSECTION: +; EDNS: version: 0, flags:; udp: 4096 +; COOKIE: 5ab8dd4622e632f6190f54de5b28bb8fb1b930a5333c3862 (good) +;; QUESTION SECTION: +;service.tcp.akka.test. IN SRV + +;; ANSWER SECTION: +_service._tcp.akka.test. 86400 IN SRV 10 60 5060 a-single.akka.test. +_service._tcp.akka.test. 86400 IN SRV 10 40 5070 a-double.akka.test. + +``` + +In this case `service.tcp.akka.test` resolves to `a-single.akka.test` on port `5060` +and `a-double.akka.test` on port `5070`. Currently discovery does not support the weightings. + +#### A/AAAA records + +Lookups with any fields missing become A/AAAA record queries. For example: + +``` +dig a-double.akka.test + +; <<>> DiG 9.11.3-RedHat-9.11.3-6.fc28 <<>> a-double.akka.test +;; global options: +cmd +;; Got answer: +;; ->>HEADER<<- opcode: QUERY, status: NOERROR, id: 11983 +;; flags: qr aa rd ra; QUERY: 1, ANSWER: 2, AUTHORITY: 1, ADDITIONAL: 2 + +;; OPT PSEUDOSECTION: +; EDNS: version: 0, flags:; udp: 4096 +; COOKIE: 16e9815d9ca2514d2f3879265b28bad05ff7b4a82721edd0 (good) +;; QUESTION SECTION: +;a-double.akka.test. IN A + +;; ANSWER SECTION: +a-double.akka.test. 86400 IN A 192.168.1.21 +a-double.akka.test. 86400 IN A 192.168.1.22 + +``` + +In this case `a-double.akka.test` would resolve to `192.168.1.21` and `192.168.1.22`. + +## Discovery Method: Configuration + +Configuration currently ignores all fields apart from service name. + +For simple use cases configuration can be used for service discovery. The advantage of using Akka Discovery with +configuration rather than your own configuration values is that applications can be migrated to a more +sophisticated discovery method without any code changes. + + +Configure it to be used as discovery method in your `application.conf` + +``` +akka { + discovery.method = config +} +``` + +By default the services discoverable are defined in `akka.discovery.config.services` and have the following format: + +``` +akka.discovery.config.services = { + service1 = { + endpoints = [ + { + host = "cat" + port = 1233 + }, + { + host = "dog" + port = 1234 + } + ] + }, + service2 = { + endpoints = [] + } +} +``` + +Where the above block defines two services, `service1` and `service2`. +Each service can have multiple endpoints. + +## Discovery Method: Aggregate multiple discovery methods + +Aggregate discovery allows multiple discovery methods to be aggregated e.g. try and resolve +via DNS and fall back to configuration. + +To use aggregate discovery add its dependency as well as all of the discovery that you +want to aggregate. + +Configure `aggregate` as `akka.discovery.method` and which discovery methods are tried and in which order. + +``` +akka { + discovery { + method = aggregate + aggregate { + discovery-methods = ["akka-dns", "config"] + } + config { + services { + service1 { + endpoints [ + { + host = "host1" + port = 1233 + }, + { + host = "host2" + port = 1234 + } + ] + } + } + } + } +} + +``` + +The above configuration will result in `akka-dns` first being checked and if it fails or returns no +targets for the given service name then `config` is queried which i configured with one service called +`service1` which two hosts `host1` and `host2`. + diff --git a/akka-docs/src/main/paradox/index.md b/akka-docs/src/main/paradox/index.md index 70fb8de5b6..2868099797 100644 --- a/akka-docs/src/main/paradox/index.md +++ b/akka-docs/src/main/paradox/index.md @@ -12,6 +12,7 @@ * [index-cluster](index-cluster.md) * [stream/index](stream/index.md) * [index-network](index-network.md) +* [discovery](discovery/index.md) * [index-futures](index-futures.md) * [index-utilities](index-utilities.md) * [common/other-modules](common/other-modules.md) diff --git a/build.sbt b/build.sbt index b678ce881a..a6c668f34f 100644 --- a/build.sbt +++ b/build.sbt @@ -45,7 +45,8 @@ lazy val aggregatedProjects: Seq[ProjectReference] = Seq( actorTyped, actorTypedTests, actorTestkitTyped, persistenceTyped, clusterTyped, clusterShardingTyped, - streamTyped + streamTyped, + discovery ) lazy val root = Project( @@ -479,6 +480,16 @@ lazy val actorTypedTests = akkaModule("akka-actor-typed-tests") .disablePlugins(MimaPlugin) .enablePlugins(NoPublish) +lazy val discovery = akkaModule("akka-discovery") + .dependsOn( + actor, + testkit % "test->test", + actorTests % "test->test" + ) + .settings(Dependencies.discovery) + .settings(AkkaBuild.mayChangeSettings) + .settings(AutomaticModuleName.settings("akka.discovery")) + .settings(OSGi.discovery) def akkaModule(name: String): Project = Project(id = name, base = file(name)) diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 5825802067..5abdb89855 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -148,6 +148,8 @@ object Dependencies { val actor = l ++= Seq(config, java8Compat.value) + val discovery = l ++= Seq(Test.junit, Test.scalatest.value) + val testkit = l ++= Seq(Test.junit, Test.scalatest.value) ++ Test.metricsAll val actorTests = l ++= Seq( diff --git a/project/MiMa.scala b/project/MiMa.scala index aaed98c896..5c711713d1 100644 --- a/project/MiMa.scala +++ b/project/MiMa.scala @@ -24,7 +24,8 @@ object MiMa extends AutoPlugin { val versions: Seq[String] = { val akka24NoStreamVersions = Seq("2.4.0", "2.4.1") val akka25Versions = (0 to latestPatchOf25).map(patch ⇒ s"2.5.$patch") - val akka24StreamVersions = (2 to 12) map ("2.4." + _) + val akka24StreamVersions = (2 to 12).map("2.4." + _) + val akka25DiscoveryVersions = (19 to latestPatchOf25).map(patch => s"2.5.$patch") val akka24WithScala212 = (13 to latestPatchOf24) .map("2.4." + _) @@ -35,17 +36,24 @@ object MiMa extends AutoPlugin { "akka-stream-testkit") val akka250NewArtifacts = Seq( "akka-persistence-query") + val akka2519NewArtifacts = Seq( + "akka-discovery" + ) scalaBinaryVersion match { case "2.11" ⇒ - if (akka250NewArtifacts.contains(projectName)) akka25Versions + if (akka2519NewArtifacts.contains(projectName)) + akka25DiscoveryVersions + else if (akka250NewArtifacts.contains(projectName)) akka25Versions else { if (!akka242NewArtifacts.contains(projectName)) akka24NoStreamVersions else Seq.empty } ++ akka24StreamVersions ++ akka24WithScala212 ++ akka25Versions case "2.12" ⇒ - if (akka250NewArtifacts.contains(projectName)) + if (akka2519NewArtifacts.contains(projectName)) + akka25DiscoveryVersions + else if (akka250NewArtifacts.contains(projectName)) akka25Versions else akka24WithScala212 ++ akka25Versions diff --git a/project/OSGi.scala b/project/OSGi.scala index 59b928e681..7fc59fac86 100644 --- a/project/OSGi.scala +++ b/project/OSGi.scala @@ -122,6 +122,8 @@ object OSGi { val testkit = exports(Seq("akka.testkit.*")) + val discovery = exports(Seq("akka.discovery.*")) + val osgiOptionalImports = Seq( // needed because testkit is normally not used in the application bundle, // but it should still be included as transitive dependency and used by BundleDelegatingClassLoader