diff --git a/serialization-jackson/src/main/resources/reference.conf b/serialization-jackson/src/main/resources/reference.conf index a9a1b3f188..083e254727 100644 --- a/serialization-jackson/src/main/resources/reference.conf +++ b/serialization-jackson/src/main/resources/reference.conf @@ -35,6 +35,17 @@ pekko.serialization.jackson { migrations { } + # Controls the Buffer Recycler Pool implementation used by Jackson. + # https://javadoc.io/static/com.fasterxml.jackson.core/jackson-core/2.16.2/com/fasterxml/jackson/core/util/JsonRecyclerPools.html + # The default is "thread-local" which is the same as the default in Jackson 2.16. + buffer-recycler { + # the supported values are "thread-local", "lock-free", "shared-lock-free", "concurrent-deque", + # "shared-concurrent-deque", "bounded" + pool-instance = "thread-local" + # the maximum size of bounded recycler pools - must be >=1 or an IllegalArgumentException will occur + # only applies to pool-instance type "bounded" + bounded-pool-size = 100 + } } #//#stream-read-constraints diff --git a/serialization-jackson/src/main/scala/org/apache/pekko/serialization/jackson/JacksonObjectMapperProvider.scala b/serialization-jackson/src/main/scala/org/apache/pekko/serialization/jackson/JacksonObjectMapperProvider.scala index a7a79f84f5..deae791ae6 100644 --- a/serialization-jackson/src/main/scala/org/apache/pekko/serialization/jackson/JacksonObjectMapperProvider.scala +++ b/serialization-jackson/src/main/scala/org/apache/pekko/serialization/jackson/JacksonObjectMapperProvider.scala @@ -30,6 +30,7 @@ import com.fasterxml.jackson.core.{ StreamWriteFeature } import com.fasterxml.jackson.core.json.{ JsonReadFeature, JsonWriteFeature } +import com.fasterxml.jackson.core.util.{ BufferRecycler, JsonRecyclerPools, RecyclerPool } import com.fasterxml.jackson.databind.{ DeserializationFeature, MapperFeature, @@ -103,10 +104,12 @@ object JacksonObjectMapperProvider extends ExtensionId[JacksonObjectMapperProvid // instead of using JsonFactoryBuilder (new in Jackson 2.10.0). factory.setStreamReadConstraints(streamReadConstraints) factory.setStreamWriteConstraints(streamWriteConstraints) + factory.setRecyclerPool(getBufferRecyclerPool(config)) case None => new JsonFactoryBuilder() .streamReadConstraints(streamReadConstraints) .streamWriteConstraints(streamWriteConstraints) + .recyclerPool(getBufferRecyclerPool(config)) .build() } @@ -153,6 +156,18 @@ object JacksonObjectMapperProvider extends ExtensionId[JacksonObjectMapperProvid jsonFactory } + private def getBufferRecyclerPool(cfg: Config): RecyclerPool[BufferRecycler] = { + cfg.getString("buffer-recycler.pool-instance") match { + case "thread-local" => JsonRecyclerPools.threadLocalPool() + case "lock-free" => JsonRecyclerPools.newLockFreePool() + case "shared-lock-free" => JsonRecyclerPools.sharedLockFreePool() + case "concurrent-deque" => JsonRecyclerPools.newConcurrentDequePool() + case "shared-concurrent-deque" => JsonRecyclerPools.sharedConcurrentDequePool() + case "bounded" => JsonRecyclerPools.newBoundedPool(cfg.getInt("buffer-recycler.bounded-pool-size")) + case other => throw new IllegalArgumentException(s"Unknown recycler-pool: $other") + } + } + @nowarn("msg=deprecated") private def configureObjectMapperFeatures( bindingName: String, diff --git a/serialization-jackson/src/test/scala/org/apache/pekko/serialization/jackson/JacksonFactorySpec.scala b/serialization-jackson/src/test/scala/org/apache/pekko/serialization/jackson/JacksonFactorySpec.scala index 272afadae4..090b408037 100644 --- a/serialization-jackson/src/test/scala/org/apache/pekko/serialization/jackson/JacksonFactorySpec.scala +++ b/serialization-jackson/src/test/scala/org/apache/pekko/serialization/jackson/JacksonFactorySpec.scala @@ -17,6 +17,7 @@ package org.apache.pekko.serialization.jackson +import com.fasterxml.jackson.core.util.JsonRecyclerPools.BoundedPool import com.typesafe.config.ConfigFactory import org.scalatest.BeforeAndAfterAll import org.scalatest.matchers.should.Matchers @@ -60,6 +61,7 @@ class JacksonFactorySpec extends TestKit(ActorSystem("JacksonFactorySpec")) streamReadConstraints.getMaxDocumentLength shouldEqual maxDocLen streamReadConstraints.getMaxNestingDepth shouldEqual maxNestingDepth } + "support StreamWriteConstraints" in { val bindingName = "testJackson" val maxNestingDepth = 54321 @@ -72,5 +74,31 @@ class JacksonFactorySpec extends TestKit(ActorSystem("JacksonFactorySpec")) val streamWriteConstraints = mapper.getFactory.streamWriteConstraints() streamWriteConstraints.getMaxNestingDepth shouldEqual maxNestingDepth } + + "support BufferRecycler (default)" in { + val bindingName = "testJackson" + val jacksonConfig = JacksonObjectMapperProvider.configForBinding(bindingName, defaultConfig) + val mapper = JacksonObjectMapperProvider.createObjectMapper( + bindingName, None, objectMapperFactory, jacksonConfig, dynamicAccess, None) + val recyclerPool = mapper.getFactory._getRecyclerPool() + recyclerPool.getClass.getSimpleName shouldEqual "ThreadLocalPool" + } + + "support BufferRecycler with config override" in { + val bindingName = "testJackson" + val poolInstance = "bounded" + val boundedPoolSize = 1234 + val config = ConfigFactory.parseString( + s"""pekko.serialization.jackson.buffer-recycler.pool-instance=$poolInstance + |pekko.serialization.jackson.buffer-recycler.bounded-pool-size=$boundedPoolSize + |""".stripMargin) + .withFallback(defaultConfig) + val jacksonConfig = JacksonObjectMapperProvider.configForBinding(bindingName, config) + val mapper = JacksonObjectMapperProvider.createObjectMapper( + bindingName, None, objectMapperFactory, jacksonConfig, dynamicAccess, None) + val recyclerPool = mapper.getFactory._getRecyclerPool() + recyclerPool.getClass.getSimpleName shouldEqual "BoundedPool" + recyclerPool.asInstanceOf[BoundedPool].capacity() shouldEqual boundedPoolSize + } } }