=remote Make ues of Netty's default resolver.

This commit is contained in:
He-Pin 2023-09-16 04:06:46 +08:00 committed by kerr
parent efaa82c966
commit 62bf7cbc7d

View file

@ -463,22 +463,24 @@ class NettyTransport(val settings: NettyTransportSettings, val system: ExtendedA
override def isResponsibleFor(address: Address): Boolean = true // TODO: Add configurable subnet filtering
// TODO: This should be factored out to an async (or thread-isolated) name lookup service #2960
// Keep this for binary compatibility reasons
def addressToSocketAddress(addr: Address): Future[InetSocketAddress] = addr match {
case Address(_, _, Some(host), Some(port)) =>
Future { blocking { new InetSocketAddress(InetAddress.getByName(host), port) } }
case _ => Future.failed(new IllegalArgumentException(s"Address [$addr] does not contain host or port information."))
Future {
blocking {
new InetSocketAddress(InetAddress.getByName(host), port)
}
}
case _ =>
Future.failed(new IllegalArgumentException(s"Address [$addr] must contain both host and port information."))
}
override def listen: Future[(Address, Promise[AssociationEventListener])] = {
@nowarn("msg=deprecated")
val bindPort = settings.BindPortSelector
for {
address <- addressToSocketAddress(Address("", "", settings.BindHostname, bindPort))
} yield {
Future.fromTry(Try {
try {
val newServerChannel = inboundBootstrap.bind(address).sync().channel()
val newServerChannel = inboundBootstrap.bind(settings.BindHostname, bindPort).sync().channel()
// Block reads until a handler actor is registered
newServerChannel.config().setAutoRead(false)
channelGroup.add(newServerChannel)
@ -511,28 +513,34 @@ class NettyTransport(val settings: NettyTransportSettings, val system: ExtendedA
s"Unknown local address type [${newServerChannel.localAddress().getClass.getName}]")
}
} catch {
case NonFatal(e) => {
log.error("failed to bind to {}, shutting down Netty transport", address)
case NonFatal(e) =>
log.error("failed to bind to host:{} port:{}, shutting down Netty transport", settings.BindHostname, bindPort)
try {
shutdown()
} catch { case NonFatal(_) => } // ignore possible exception during shutdown
throw e
}
} catch {
case NonFatal(_) =>
} // ignore possible exception during shutdown
throw e;
}
}
})
}
// Need to do like this for binary compatibility reasons
private[pekko] def boundAddress = boundTo
private def extractHostAndPort(addr: Address): (String, Int) = addr match {
case Address(_, _, Some(host), Some(port)) => (host, port)
case _ => throw new IllegalArgumentException(s"Address [$addr] must contain both host and port information.")
}
override def associate(remoteAddress: Address): Future[AssociationHandle] = {
if (!serverChannel.isActive) Future.failed(new NettyTransportException("Transport is not bound"))
else {
val bootstrap: ClientBootstrap = outboundBootstrap(remoteAddress)
(for {
socketAddress <- addressToSocketAddress(remoteAddress)
readyChannel <- NettyFutureBridge(bootstrap.connect(socketAddress)).map { channel =>
(host, port) <- Future.fromTry(Try(extractHostAndPort(remoteAddress)))
readyChannel <- NettyFutureBridge(bootstrap.connect(host, port)).map { channel =>
if (EnableSsl)
blocking {
channel.pipeline().get(classOf[SslHandler]).handshakeFuture().awaitUninterruptibly()