From 7426c8a1f495cf14c021ede7bea20e631991350f Mon Sep 17 00:00:00 2001 From: PJ Fanning Date: Sat, 9 Sep 2023 00:30:31 +0100 Subject: [PATCH] simplify some NettyTransport code (#639) fix imports scalafmt --- .../transport/netty/NettyTransport.scala | 71 +++++++------------ 1 file changed, 27 insertions(+), 44 deletions(-) diff --git a/remote/src/main/scala/org/apache/pekko/remote/transport/netty/NettyTransport.scala b/remote/src/main/scala/org/apache/pekko/remote/transport/netty/NettyTransport.scala index daa804acf5..b33dbff3c4 100644 --- a/remote/src/main/scala/org/apache/pekko/remote/transport/netty/NettyTransport.scala +++ b/remote/src/main/scala/org/apache/pekko/remote/transport/netty/NettyTransport.scala @@ -13,59 +13,46 @@ package org.apache.pekko.remote.transport.netty -import java.net.InetAddress -import java.net.InetSocketAddress -import java.net.SocketAddress -import java.util.concurrent.CancellationException -import java.util.concurrent.ConcurrentHashMap -import java.util.concurrent.Executors +import java.net.{ InetAddress, InetSocketAddress, SocketAddress } +import java.util.concurrent.{ CancellationException, ConcurrentHashMap, Executors } import java.util.concurrent.atomic.AtomicInteger import scala.annotation.nowarn -import scala.concurrent.ExecutionContext -import scala.concurrent.Future -import scala.concurrent.Promise -import scala.concurrent.blocking +import scala.concurrent.{ blocking, ExecutionContext, Future, Promise } import scala.concurrent.duration.FiniteDuration import scala.util.Try -import scala.util.control.NoStackTrace -import scala.util.control.NonFatal +import scala.util.control.{ NoStackTrace, NonFatal } import com.typesafe.config.Config -import org.apache.pekko -import org.jboss.netty.bootstrap.Bootstrap -import org.jboss.netty.bootstrap.ClientBootstrap -import org.jboss.netty.bootstrap.ConnectionlessBootstrap -import org.jboss.netty.bootstrap.ServerBootstrap -import org.jboss.netty.buffer.ChannelBuffer -import org.jboss.netty.buffer.ChannelBuffers +import org.jboss.netty.bootstrap.{ Bootstrap, ClientBootstrap, ServerBootstrap } +import org.jboss.netty.buffer.{ ChannelBuffer, ChannelBuffers } import org.jboss.netty.channel._ -import org.jboss.netty.channel.group.ChannelGroup -import org.jboss.netty.channel.group.ChannelGroupFuture -import org.jboss.netty.channel.group.ChannelGroupFutureListener -import org.jboss.netty.channel.group.DefaultChannelGroup -import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory -import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory -import org.jboss.netty.channel.socket.nio.NioWorkerPool -import org.jboss.netty.handler.codec.frame.LengthFieldBasedFrameDecoder -import org.jboss.netty.handler.codec.frame.LengthFieldPrepender +import org.jboss.netty.channel.group.{ + ChannelGroup, + ChannelGroupFuture, + ChannelGroupFutureListener, + DefaultChannelGroup +} +import org.jboss.netty.channel.socket.nio.{ + NioClientSocketChannelFactory, + NioServerSocketChannelFactory, + NioWorkerPool +} +import org.jboss.netty.handler.codec.frame.{ LengthFieldBasedFrameDecoder, LengthFieldPrepender } import org.jboss.netty.handler.ssl.SslHandler import org.jboss.netty.util.HashedWheelTimer +import org.apache.pekko import pekko.ConfigurationException import pekko.OnlyCauseStackTrace -import pekko.actor.ActorSystem -import pekko.actor.Address -import pekko.actor.ExtendedActorSystem +import pekko.actor.{ ActorSystem, Address, ExtendedActorSystem } import pekko.dispatch.ThreadPoolConfig import pekko.event.Logging import pekko.remote.RARP -import pekko.remote.transport.AssociationHandle -import pekko.remote.transport.AssociationHandle.HandleEventListener -import pekko.remote.transport.Transport -import pekko.remote.transport.Transport._ -import pekko.util.Helpers -import pekko.util.Helpers.Requiring -import pekko.util.OptionVal +import pekko.remote.transport.{ AssociationHandle, Transport } +import pekko.util.{ Helpers, OptionVal } +import AssociationHandle.HandleEventListener +import Transport._ +import Helpers.Requiring @deprecated("Classic remoting is deprecated, use Artery", "Akka 2.6.0") object NettyFutureBridge { @@ -481,7 +468,7 @@ class NettyTransport(val settings: NettyTransportSettings, val system: ExtendedA bootstrap } - private val inboundBootstrap: Bootstrap = { + private val inboundBootstrap: ServerBootstrap = { setupBootstrap(new ServerBootstrap(serverChannelFactory), serverPipelineFactory) } @@ -514,11 +501,7 @@ class NettyTransport(val settings: NettyTransportSettings, val system: ExtendedA address <- addressToSocketAddress(Address("", "", settings.BindHostname, bindPort)) } yield { try { - val newServerChannel = inboundBootstrap match { - case b: ServerBootstrap => b.bind(address) - case b: ConnectionlessBootstrap => b.bind(address) - case _ => throw new IllegalStateException() // won't happen, compiler exhaustiveness check pleaser - } + val newServerChannel = inboundBootstrap.bind(address) // Block reads until a handler actor is registered newServerChannel.setReadable(false)