chore: Add support for controlling the NettyTransport's byteBuf allocator type. (#1707)

* chore: Add support for controlling the NettyTransport's byteBuf allocator type.

* chore: extract deriveByteBufAllocator method
This commit is contained in:
He-Pin(kerr) 2025-01-14 18:41:35 +08:00 committed by GitHub
parent 9844a1befa
commit dbc9ed3a99
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
5 changed files with 73 additions and 12 deletions

View file

@ -621,6 +621,14 @@ pekko {
# "off-for-windows" of course means that it's "on" for all other platforms
tcp-reuse-addr = off-for-windows
# Used to control the Netty 4's ByteBufAllocator. The default is "pooled".
# pooled : use a PooledByteBufAllocator.DEFAULT
# unpooled : use an UnpooledByteBufAllocator.DEFAULT
# unpooled-heap : use an UnpooledByteBufAllocator with prefer direct `false`
# adaptive : use an AdaptiveByteBufAllocator
# adaptive-heap : use an AdaptiveByteBufAllocator with prefer direct `false`
bytebuf-allocator-type = "pooled"
# Used to configure the number of I/O worker threads on server sockets
server-socket-worker-pool {
# Min number of threads to cap factor-based number to

View file

@ -24,6 +24,7 @@ import scala.util.Try
import scala.util.control.{ NoStackTrace, NonFatal }
import com.typesafe.config.Config
import org.apache.pekko
import pekko.ConfigurationException
import pekko.OnlyCauseStackTrace
@ -38,7 +39,13 @@ import pekko.util.Helpers.Requiring
import pekko.util.{ Helpers, OptionVal }
import io.netty.bootstrap.{ Bootstrap => ClientBootstrap, ServerBootstrap }
import io.netty.buffer.Unpooled
import io.netty.buffer.{
AdaptiveByteBufAllocator,
ByteBufAllocator,
PooledByteBufAllocator,
Unpooled,
UnpooledByteBufAllocator
}
import io.netty.channel.{
Channel,
ChannelFuture,
@ -160,6 +167,8 @@ class NettyTransportSettings(config: Config) {
case _ => getBoolean("tcp-reuse-addr")
}
val ByteBufAllocator: ByteBufAllocator = NettyTransport.deriveByteBufAllocator(getString("bytebuf-allocator-type"))
val Hostname: String = getString("hostname") match {
case "" => InetAddress.getLocalHost.getHostAddress
case value => value
@ -318,6 +327,17 @@ private[transport] object NettyTransport {
systemName: String,
hostName: Option[String]): Option[Address] =
addressFromSocketAddress(addr, schemeIdentifier, systemName, hostName, port = None)
def deriveByteBufAllocator(allocatorType: String): ByteBufAllocator = allocatorType match {
case "pooled" => PooledByteBufAllocator.DEFAULT
case "unpooled" => UnpooledByteBufAllocator.DEFAULT
case "unpooled-heap" => new UnpooledByteBufAllocator(false)
case "adaptive" => new AdaptiveByteBufAllocator()
case "adaptive-heap" => new AdaptiveByteBufAllocator(false)
case other => throw new IllegalArgumentException(
"Unknown 'bytebuf-allocator-type' [" + other + "]," +
" supported values are 'pooled', 'unpooled', 'unpooled-heap', 'adaptive', 'adaptive-heap'.")
}
}
@deprecated("Classic remoting is deprecated, use Artery", "Akka 2.6.0")
@ -442,6 +462,10 @@ class NettyTransport(val settings: NettyTransportSettings, val system: ExtendedA
bootstrap.childOption[java.lang.Boolean](ChannelOption.TCP_NODELAY, settings.TcpNodelay)
bootstrap.childOption[java.lang.Boolean](ChannelOption.SO_KEEPALIVE, settings.TcpKeepalive)
// Use the same allocator for inbound and outbound buffers
bootstrap.option(ChannelOption.ALLOCATOR, settings.ByteBufAllocator)
bootstrap.childOption(ChannelOption.ALLOCATOR, settings.ByteBufAllocator)
settings.ReceiveBufferSize.foreach(sz => bootstrap.childOption[java.lang.Integer](ChannelOption.SO_RCVBUF, sz))
settings.SendBufferSize.foreach(sz => bootstrap.childOption[java.lang.Integer](ChannelOption.SO_SNDBUF, sz))
settings.WriteBufferHighWaterMark.filter(_ > 0).foreach(sz =>

View file

@ -13,9 +13,11 @@
package org.apache.pekko.remote
import scala.concurrent.duration._
import io.netty.buffer.PooledByteBufAllocator
import scala.concurrent.duration._
import scala.annotation.nowarn
import language.postfixOps
import org.apache.pekko
@ -103,6 +105,7 @@ class RemoteConfigSpec extends PekkoSpec("""
TcpNodelay should ===(true)
TcpKeepalive should ===(true)
TcpReuseAddr should ===(!Helpers.isWindows)
ByteBufAllocator should ===(PooledByteBufAllocator.DEFAULT)
c.getString("hostname") should ===("")
c.getString("bind-hostname") should ===("")
c.getString("bind-port") should ===("")

View file

@ -24,7 +24,7 @@ import org.scalatest.wordspec.AnyWordSpec
import org.apache.pekko
import pekko.actor.{ ActorSystem, Address }
import pekko.remote.classic.transport.netty.NettyTransportSpec._
import pekko.remote.transport.NettyTransportSpec._
import pekko.testkit.SocketUtil
trait BindCanonicalAddressBehaviors {

View file

@ -11,23 +11,25 @@
* Copyright (C) 2018-2022 Lightbend Inc. <https://www.lightbend.com>
*/
package org.apache.pekko.remote.classic.transport.netty
import java.net.{ InetAddress, InetSocketAddress }
import java.nio.channels.ServerSocketChannel
import scala.concurrent.Await
import scala.concurrent.duration.Duration
package org.apache.pekko.remote.transport
import com.typesafe.config.ConfigFactory
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpec
import io.netty.buffer.{ AdaptiveByteBufAllocator, PooledByteBufAllocator, UnpooledByteBufAllocator }
import org.apache.pekko
import pekko.actor.{ ActorSystem, Address, ExtendedActorSystem }
import pekko.remote.BoundAddressesExtension
import pekko.remote.transport.netty.NettyTransport.deriveByteBufAllocator
import pekko.testkit.SocketUtil
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpec
import java.net.{ InetAddress, InetSocketAddress }
import java.nio.channels.ServerSocketChannel
import scala.concurrent.Await
import scala.concurrent.duration.Duration
object NettyTransportSpec {
val commonConfig = ConfigFactory.parseString("""
pekko.actor.provider = remote
@ -132,6 +134,30 @@ class NettyTransportSpec extends AnyWordSpec with Matchers with BindBehavior {
Await.result(sys.terminate(), Duration.Inf)
}
"be able to specify byte buffer allocator" in {
deriveByteBufAllocator("pooled") should ===(PooledByteBufAllocator.DEFAULT)
deriveByteBufAllocator("unpooled") should ===(UnpooledByteBufAllocator.DEFAULT)
{
val allocator = deriveByteBufAllocator("unpooled-heap")
allocator shouldBe a[UnpooledByteBufAllocator]
allocator.toString.contains("directByDefault: false") should ===(true)
}
{
val allocator = deriveByteBufAllocator("adaptive")
allocator shouldBe a[AdaptiveByteBufAllocator]
allocator.toString.contains("directByDefault: true") should ===(true)
}
{
val allocator = deriveByteBufAllocator("adaptive-heap")
allocator shouldBe a[AdaptiveByteBufAllocator]
allocator.toString.contains("directByDefault: false") should ===(true)
}
}
}
}