Towards stable APIs of akka-discovery (#26172)
* replaced case classes to make it easier to evolve those * remove ResolvedTarget.apply with 2 parameters * because it InetAddress.getByName may perform blocking DNS lookup
This commit is contained in:
parent
4bf896d865
commit
bd2cc02eaa
4 changed files with 149 additions and 40 deletions
|
|
@ -0,0 +1,35 @@
|
||||||
|
# Removed case class
|
||||||
|
ProblemFilters.exclude[MissingTypesProblem]("akka.discovery.ServiceDiscovery$Resolved")
|
||||||
|
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.discovery.ServiceDiscovery#Resolved.productElement")
|
||||||
|
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.discovery.ServiceDiscovery#Resolved.productArity")
|
||||||
|
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.discovery.ServiceDiscovery#Resolved.copy$default$2")
|
||||||
|
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.discovery.ServiceDiscovery#Resolved.canEqual")
|
||||||
|
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.discovery.ServiceDiscovery#Resolved.copy")
|
||||||
|
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.discovery.ServiceDiscovery#Resolved.copy$default$1")
|
||||||
|
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.discovery.ServiceDiscovery#Resolved.productIterator")
|
||||||
|
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.discovery.ServiceDiscovery#Resolved.productPrefix")
|
||||||
|
ProblemFilters.exclude[MissingTypesProblem]("akka.discovery.ServiceDiscovery$ResolvedTarget")
|
||||||
|
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.discovery.ServiceDiscovery#ResolvedTarget.apply")
|
||||||
|
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.discovery.ServiceDiscovery#ResolvedTarget.productElement")
|
||||||
|
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.discovery.ServiceDiscovery#ResolvedTarget.productArity")
|
||||||
|
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.discovery.ServiceDiscovery#ResolvedTarget.copy$default$2")
|
||||||
|
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.discovery.ServiceDiscovery#ResolvedTarget.canEqual")
|
||||||
|
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.discovery.ServiceDiscovery#ResolvedTarget.copy")
|
||||||
|
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.discovery.ServiceDiscovery#ResolvedTarget.copy$default$1")
|
||||||
|
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.discovery.ServiceDiscovery#ResolvedTarget.productIterator")
|
||||||
|
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.discovery.ServiceDiscovery#ResolvedTarget.productPrefix")
|
||||||
|
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.discovery.ServiceDiscovery#ResolvedTarget.copy$default$3")
|
||||||
|
ProblemFilters.exclude[MissingTypesProblem]("akka.discovery.Lookup")
|
||||||
|
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.discovery.Lookup.productElement")
|
||||||
|
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.discovery.Lookup.productArity")
|
||||||
|
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.discovery.Lookup.copy$default$2")
|
||||||
|
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.discovery.Lookup.canEqual")
|
||||||
|
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.discovery.Lookup.copy")
|
||||||
|
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.discovery.Lookup.copy$default$1")
|
||||||
|
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.discovery.Lookup.productIterator")
|
||||||
|
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.discovery.Lookup.productPrefix")
|
||||||
|
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.discovery.Lookup.copy$default$3")
|
||||||
|
ProblemFilters.exclude[MissingTypesProblem]("akka.discovery.ServiceDiscovery$ResolvedTarget$")
|
||||||
|
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.discovery.ServiceDiscovery#ResolvedTarget.unapply")
|
||||||
|
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.discovery.Lookup.unapply")
|
||||||
|
ProblemFilters.exclude[MissingTypesProblem]("akka.discovery.ServiceDiscovery$Resolved$")
|
||||||
|
|
@ -9,19 +9,27 @@ import java.util.Optional
|
||||||
import java.util.concurrent.CompletionStage
|
import java.util.concurrent.CompletionStage
|
||||||
import java.util.concurrent.TimeUnit
|
import java.util.concurrent.TimeUnit
|
||||||
|
|
||||||
import akka.actor.DeadLetterSuppression
|
|
||||||
import akka.annotation.ApiMayChange
|
|
||||||
|
|
||||||
import scala.collection.immutable
|
import scala.collection.immutable
|
||||||
import scala.concurrent.Future
|
import scala.concurrent.Future
|
||||||
import scala.concurrent.duration.FiniteDuration
|
import scala.concurrent.duration.FiniteDuration
|
||||||
import scala.util.Try
|
|
||||||
|
import akka.actor.DeadLetterSuppression
|
||||||
|
import akka.annotation.ApiMayChange
|
||||||
|
import akka.util.HashCode
|
||||||
|
|
||||||
@ApiMayChange
|
@ApiMayChange
|
||||||
object ServiceDiscovery {
|
object ServiceDiscovery {
|
||||||
|
|
||||||
|
object Resolved {
|
||||||
|
def apply(serviceName: String, addresses: immutable.Seq[ResolvedTarget]): Resolved =
|
||||||
|
new Resolved(serviceName, addresses)
|
||||||
|
|
||||||
|
def unapply(resolved: Resolved): Option[(String, immutable.Seq[ResolvedTarget])] =
|
||||||
|
Some((resolved.serviceName, resolved.addresses))
|
||||||
|
}
|
||||||
|
|
||||||
/** Result of a successful resolve request */
|
/** Result of a successful resolve request */
|
||||||
final case class Resolved(serviceName: String, addresses: immutable.Seq[ResolvedTarget])
|
final class Resolved(val serviceName: String, val addresses: immutable.Seq[ResolvedTarget])
|
||||||
extends DeadLetterSuppression {
|
extends DeadLetterSuppression {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -31,6 +39,21 @@ object ServiceDiscovery {
|
||||||
import scala.collection.JavaConverters._
|
import scala.collection.JavaConverters._
|
||||||
addresses.asJava
|
addresses.asJava
|
||||||
}
|
}
|
||||||
|
|
||||||
|
override def toString: String = s"Resolved($serviceName,$addresses)"
|
||||||
|
|
||||||
|
override def equals(obj: Any): Boolean = obj match {
|
||||||
|
case other: Resolved ⇒ serviceName == other.serviceName && addresses == other.addresses
|
||||||
|
case _ ⇒ false
|
||||||
|
}
|
||||||
|
|
||||||
|
override def hashCode(): Int = {
|
||||||
|
var result = HashCode.SEED
|
||||||
|
result = HashCode.hash(result, serviceName)
|
||||||
|
result = HashCode.hash(result, addresses)
|
||||||
|
result
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
object ResolvedTarget {
|
object ResolvedTarget {
|
||||||
|
|
@ -43,8 +66,13 @@ object ServiceDiscovery {
|
||||||
(t.address, t.host, t.port)
|
(t.address, t.host, t.port)
|
||||||
}
|
}
|
||||||
|
|
||||||
def apply(host: String, port: Option[Int]): ResolvedTarget =
|
/**
|
||||||
ResolvedTarget(host, port, Try(InetAddress.getByName(host)).toOption)
|
* @param host the hostname or the IP address of the target
|
||||||
|
* @param port optional port number
|
||||||
|
* @param address IP address of the target. This is used during cluster bootstap when available.
|
||||||
|
*/
|
||||||
|
def apply(host: String, port: Option[Int], address: Option[InetAddress]): ResolvedTarget =
|
||||||
|
new ResolvedTarget(host, port, address)
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -53,10 +81,10 @@ object ServiceDiscovery {
|
||||||
* @param port optional port number
|
* @param port optional port number
|
||||||
* @param address optional IP address of the target. This is used during cluster bootstap when available.
|
* @param address optional IP address of the target. This is used during cluster bootstap when available.
|
||||||
*/
|
*/
|
||||||
final case class ResolvedTarget(
|
final class ResolvedTarget(
|
||||||
host: String,
|
val host: String,
|
||||||
port: Option[Int],
|
val port: Option[Int],
|
||||||
address: Option[InetAddress]
|
val address: Option[InetAddress]
|
||||||
) {
|
) {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -74,6 +102,22 @@ object ServiceDiscovery {
|
||||||
import scala.compat.java8.OptionConverters._
|
import scala.compat.java8.OptionConverters._
|
||||||
address.asJava
|
address.asJava
|
||||||
}
|
}
|
||||||
|
|
||||||
|
override def toString: String = s"ResolvedTarget($host,$port,$address)"
|
||||||
|
|
||||||
|
override def equals(obj: Any): Boolean = obj match {
|
||||||
|
case other: ResolvedTarget ⇒ host == other.host && port == other.port && address == other.address
|
||||||
|
case _ ⇒ false
|
||||||
|
}
|
||||||
|
|
||||||
|
override def hashCode(): Int = {
|
||||||
|
var result = HashCode.SEED
|
||||||
|
result = HashCode.hash(result, host)
|
||||||
|
result = HashCode.hash(result, port)
|
||||||
|
result = HashCode.hash(result, address)
|
||||||
|
result
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
@ -86,7 +130,10 @@ object ServiceDiscovery {
|
||||||
*
|
*
|
||||||
*/
|
*/
|
||||||
@ApiMayChange
|
@ApiMayChange
|
||||||
final case class Lookup(serviceName: String, portName: Option[String], protocol: Option[String]) {
|
final class Lookup(
|
||||||
|
val serviceName: String,
|
||||||
|
val portName: Option[String],
|
||||||
|
val protocol: Option[String]) {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Which port for a service e.g. Akka remoting or HTTP.
|
* Which port for a service e.g. Akka remoting or HTTP.
|
||||||
|
|
@ -99,6 +146,28 @@ final case class Lookup(serviceName: String, portName: Option[String], protocol:
|
||||||
* Maps to "protocol" for SRV records.
|
* Maps to "protocol" for SRV records.
|
||||||
*/
|
*/
|
||||||
def withProtocol(value: String): Lookup = copy(protocol = Some(value))
|
def withProtocol(value: String): Lookup = copy(protocol = Some(value))
|
||||||
|
|
||||||
|
private def copy(
|
||||||
|
serviceName: String = serviceName,
|
||||||
|
portName: Option[String] = portName,
|
||||||
|
protocol: Option[String] = protocol): Lookup =
|
||||||
|
new Lookup(serviceName, portName, protocol)
|
||||||
|
|
||||||
|
override def toString: String = s"Lookup($serviceName,$portName,$protocol)"
|
||||||
|
|
||||||
|
override def equals(obj: Any): Boolean = obj match {
|
||||||
|
case other: Lookup ⇒ serviceName == other.serviceName && portName == other.portName && protocol == other.protocol
|
||||||
|
case _ ⇒ false
|
||||||
|
}
|
||||||
|
|
||||||
|
override def hashCode(): Int = {
|
||||||
|
var result = HashCode.SEED
|
||||||
|
result = HashCode.hash(result, serviceName)
|
||||||
|
result = HashCode.hash(result, portName)
|
||||||
|
result = HashCode.hash(result, protocol)
|
||||||
|
result
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@ApiMayChange
|
@ApiMayChange
|
||||||
|
|
@ -111,6 +180,12 @@ case object Lookup {
|
||||||
*/
|
*/
|
||||||
def apply(serviceName: String): Lookup = new Lookup(serviceName, None, None)
|
def apply(serviceName: String): Lookup = new Lookup(serviceName, None, None)
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create a service Lookup with `serviceName`, optional `portName` and optional `protocol`.
|
||||||
|
*/
|
||||||
|
def apply(serviceName: String, portName: Option[String], protocol: Option[String]): Lookup =
|
||||||
|
new Lookup(serviceName, portName, protocol)
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Java API
|
* Java API
|
||||||
*
|
*
|
||||||
|
|
|
||||||
|
|
@ -45,7 +45,7 @@ private object DnsServiceDiscovery {
|
||||||
} else {
|
} else {
|
||||||
addresses
|
addresses
|
||||||
}
|
}
|
||||||
case other ⇒ im.Seq.empty[ResolvedTarget]
|
case _ ⇒ im.Seq.empty[ResolvedTarget]
|
||||||
}
|
}
|
||||||
|
|
||||||
Resolved(srvRequest, addresses)
|
Resolved(srvRequest, addresses)
|
||||||
|
|
@ -78,33 +78,32 @@ private[akka] class DnsServiceDiscovery(system: ExtendedActorSystem) extends Ser
|
||||||
if (ipString.startsWith("/")) ipString.tail else ipString
|
if (ipString.startsWith("/")) ipString.tail else ipString
|
||||||
|
|
||||||
override def lookup(lookup: Lookup, resolveTimeout: FiniteDuration): Future[Resolved] = {
|
override def lookup(lookup: Lookup, resolveTimeout: FiniteDuration): Future[Resolved] = {
|
||||||
lookup match {
|
if (lookup.portName.isDefined && lookup.protocol.isDefined) {
|
||||||
case Lookup(name, Some(portName), Some(protocol)) ⇒
|
val srvRequest = s"_${lookup.portName.get}._${lookup.protocol.get}.${lookup.serviceName}"
|
||||||
val srvRequest = s"_$portName._$protocol.$name"
|
log.debug("Lookup [{}] translated to SRV query [{}] as contains portName and protocol", lookup, srvRequest)
|
||||||
log.debug("Lookup [{}] translated to SRV query [{}] as contains portName and protocol", lookup, srvRequest)
|
dns.ask(DnsProtocol.Resolve(srvRequest, Srv))(resolveTimeout).map {
|
||||||
dns.ask(DnsProtocol.Resolve(srvRequest, Srv))(resolveTimeout).map {
|
case resolved: DnsProtocol.Resolved ⇒
|
||||||
case resolved: DnsProtocol.Resolved ⇒
|
log.debug("Resolved Dns.Resolved: {}", resolved)
|
||||||
log.debug("Resolved Dns.Resolved: {}", resolved)
|
srvRecordsToResolved(srvRequest, resolved)
|
||||||
srvRecordsToResolved(srvRequest, resolved)
|
case resolved ⇒
|
||||||
case resolved ⇒
|
log.warning("Resolved UNEXPECTED (resolving to Nil): {}", resolved.getClass)
|
||||||
log.warning("Resolved UNEXPECTED (resolving to Nil): {}", resolved.getClass)
|
Resolved(srvRequest, Nil)
|
||||||
Resolved(srvRequest, Nil)
|
}
|
||||||
}
|
} else {
|
||||||
case _ ⇒
|
log.debug("Lookup[{}] translated to A/AAAA lookup as does not have portName and protocol", lookup)
|
||||||
log.debug("Lookup[{}] translated to A/AAAA lookup as does not have portName and protocol", lookup)
|
dns.ask(DnsProtocol.Resolve(lookup.serviceName, Ip()))(resolveTimeout).map {
|
||||||
dns.ask(DnsProtocol.Resolve(lookup.serviceName, Ip()))(resolveTimeout).map {
|
case resolved: DnsProtocol.Resolved ⇒
|
||||||
case resolved: DnsProtocol.Resolved ⇒
|
log.debug("Resolved Dns.Resolved: {}", resolved)
|
||||||
log.debug("Resolved Dns.Resolved: {}", resolved)
|
val addresses = resolved.records.collect {
|
||||||
val addresses = resolved.records.collect {
|
case a: ARecord ⇒ ResolvedTarget(cleanIpString(a.ip.getHostAddress), None, Some(a.ip))
|
||||||
case a: ARecord ⇒ ResolvedTarget(cleanIpString(a.ip.getHostAddress), None, Some(a.ip))
|
case a: AAAARecord ⇒ ResolvedTarget(cleanIpString(a.ip.getHostAddress), None, Some(a.ip))
|
||||||
case a: AAAARecord ⇒ ResolvedTarget(cleanIpString(a.ip.getHostAddress), None, Some(a.ip))
|
}
|
||||||
}
|
Resolved(lookup.serviceName, addresses)
|
||||||
Resolved(lookup.serviceName, addresses)
|
case resolved ⇒
|
||||||
case resolved ⇒
|
log.warning("Resolved UNEXPECTED (resolving to Nil): {}", resolved.getClass)
|
||||||
log.warning("Resolved UNEXPECTED (resolving to Nil): {}", resolved.getClass)
|
Resolved(lookup.serviceName, Nil)
|
||||||
Resolved(lookup.serviceName, Nil)
|
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -78,7 +78,7 @@ class DnsDiscoverySpec extends AkkaSpec(DnsDiscoverySpec.config)
|
||||||
val result = discovery.lookup(name, resolveTimeout = 500.milliseconds).futureValue
|
val result = discovery.lookup(name, resolveTimeout = 500.milliseconds).futureValue
|
||||||
result.serviceName shouldEqual name
|
result.serviceName shouldEqual name
|
||||||
result.addresses.toSet shouldEqual Set(
|
result.addresses.toSet shouldEqual Set(
|
||||||
ResolvedTarget("192.168.1.20", None)
|
ResolvedTarget("192.168.1.20", None, Some(InetAddress.getByName("192.168.1.20")))
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue