=remote Drop the blocking usage.

This commit is contained in:
He-Pin 2023-09-16 14:03:33 +08:00 committed by kerr
parent 62bf7cbc7d
commit 9288d4c6f1

View file

@ -68,6 +68,17 @@ object NettyFutureBridge {
p.future
}
private[transport] def apply[T](nettyFuture: io.netty.util.concurrent.Future[T]): Future[T] = {
val p = Promise[T]()
nettyFuture.addListener((future: io.netty.util.concurrent.Future[T]) =>
p.complete(
Try(
if (future.isSuccess) future.get()
else if (future.isCancelled) throw new CancellationException
else throw future.cause())))
p.future
}
def apply(nettyFuture: ChannelGroupFuture): Future[ChannelGroup] = {
import pekko.util.ccompat.JavaConverters._
val p = Promise[ChannelGroup]()
@ -540,14 +551,10 @@ class NettyTransport(val settings: NettyTransportSettings, val system: ExtendedA
(for {
(host, port) <- Future.fromTry(Try(extractHostAndPort(remoteAddress)))
readyChannel <- NettyFutureBridge(bootstrap.connect(host, port)).map { channel =>
if (EnableSsl)
blocking {
channel.pipeline().get(classOf[SslHandler]).handshakeFuture().awaitUninterruptibly()
}
channel.config.setAutoRead(false)
channel
}
channel <- NettyFutureBridge(bootstrap.connect(host, port))
readyChannel <- if (EnableSsl) {
NettyFutureBridge(channel.pipeline().get(classOf[SslHandler]).handshakeFuture())
} else Future.successful(channel)
handle <- readyChannel.pipeline().get(classOf[ClientHandler]).statusFuture
} yield handle).recover {
case _: CancellationException => throw new NettyTransportExceptionNoStack("Connection was cancelled")