Artery hostname for client side of TCP connection, #28546

This commit is contained in:
Patrik Nordwall 2020-01-31 14:53:51 +01:00 committed by GitHub
parent 2677edf930
commit a6cc73b447
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 17 additions and 2 deletions

View file

@ -1087,6 +1087,9 @@ akka {
tcp { tcp {
# Timeout of establishing outbound connections. # Timeout of establishing outbound connections.
connection-timeout = 5 seconds connection-timeout = 5 seconds
# The local address that is used for the client side of the TCP connection.
outbound-client-hostname = ""
} }

View file

@ -223,6 +223,12 @@ private[akka] final class ArterySettings private (config: Config) {
val ConnectionTimeout: FiniteDuration = config val ConnectionTimeout: FiniteDuration = config
.getMillisDuration("connection-timeout") .getMillisDuration("connection-timeout")
.requiring(interval => interval > Duration.Zero, "connection-timeout must be more than zero") .requiring(interval => interval > Duration.Zero, "connection-timeout must be more than zero")
val OutboundClientHostname: Option[String] = {
config.getString("outbound-client-hostname") match {
case "" => None
case hostname => Some(hostname)
}
}
} }
} }

View file

@ -121,13 +121,17 @@ private[remote] class ArteryTcpTransport(
val port = outboundContext.remoteAddress.port.get val port = outboundContext.remoteAddress.port.get
val remoteAddress = InetSocketAddress.createUnresolved(host, port) val remoteAddress = InetSocketAddress.createUnresolved(host, port)
def connectionFlow: Flow[ByteString, ByteString, Future[Tcp.OutgoingConnection]] = def connectionFlow: Flow[ByteString, ByteString, Future[Tcp.OutgoingConnection]] = {
val localAddress = settings.Advanced.Tcp.OutboundClientHostname match {
case None => None
case Some(clientHostname) => Some(new InetSocketAddress(clientHostname, 0))
}
if (tlsEnabled) { if (tlsEnabled) {
val sslProvider = sslEngineProvider.get val sslProvider = sslEngineProvider.get
Tcp().outgoingConnectionWithTls( Tcp().outgoingConnectionWithTls(
remoteAddress, remoteAddress,
createSSLEngine = () => sslProvider.createClientSSLEngine(host, port), createSSLEngine = () => sslProvider.createClientSSLEngine(host, port),
localAddress = None, localAddress,
options = Nil, options = Nil,
connectTimeout = settings.Advanced.Tcp.ConnectionTimeout, connectTimeout = settings.Advanced.Tcp.ConnectionTimeout,
idleTimeout = Duration.Inf, idleTimeout = Duration.Inf,
@ -136,9 +140,11 @@ private[remote] class ArteryTcpTransport(
} else { } else {
Tcp().outgoingConnection( Tcp().outgoingConnection(
remoteAddress, remoteAddress,
localAddress,
halfClose = true, // issue https://github.com/akka/akka/issues/24392 if set to false halfClose = true, // issue https://github.com/akka/akka/issues/24392 if set to false
connectTimeout = settings.Advanced.Tcp.ConnectionTimeout) connectTimeout = settings.Advanced.Tcp.ConnectionTimeout)
} }
}
def connectionFlowWithRestart: Flow[ByteString, ByteString, NotUsed] = { def connectionFlowWithRestart: Flow[ByteString, ByteString, NotUsed] = {
val restartCount = new AtomicInteger(0) val restartCount = new AtomicInteger(0)