* change config structure to be able to support other compression algorithms, such as lz4 in the future * enable compression for json (as before) but disable it by default for cbor Co-Authored-By: Ignasi Marimon-Clos <ignasi@lightbend.com>
This commit is contained in:
parent
f2de57a057
commit
ae70a833fe
5 changed files with 109 additions and 34 deletions
|
|
@ -201,13 +201,15 @@ class JacksonSerializationBench {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
serialization.jackson {
|
serialization.jackson {
|
||||||
compress-larger-than = 100000 b
|
|
||||||
|
|
||||||
serialization-features {
|
serialization-features {
|
||||||
#WRITE_DATES_AS_TIMESTAMPS = off
|
#WRITE_DATES_AS_TIMESTAMPS = off
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
akka.serialization.jackson.jackson-json.compression {
|
||||||
|
algorithm = off
|
||||||
|
compress-larger-than = 100 b
|
||||||
|
}
|
||||||
""")
|
""")
|
||||||
|
|
||||||
system = ActorSystem("JacksonSerializationBench", config)
|
system = ActorSystem("JacksonSerializationBench", config)
|
||||||
|
|
|
||||||
|
|
@ -362,14 +362,20 @@ Java compiler option is enabled.
|
||||||
|
|
||||||
### Compression
|
### Compression
|
||||||
|
|
||||||
JSON can be rather verbose and for large messages it can be beneficial compress large payloads. Messages larger
|
JSON can be rather verbose and for large messages it can be beneficial to compress large payloads. For
|
||||||
than the following configuration are compressed with GZIP.
|
the `jackson-json` binding the default configuration is:
|
||||||
|
|
||||||
@@snip [reference.conf](/akka-serialization-jackson/src/main/resources/reference.conf) { #compression }
|
@@snip [reference.conf](/akka-serialization-jackson/src/main/resources/reference.conf) { #compression }
|
||||||
|
|
||||||
Compression can be disabled by setting this configuration property to `off`. It will still be able to decompress
|
Messages larger than the `compress-larger-than` property are compressed with GZIP.
|
||||||
|
|
||||||
|
Compression can be disabled by setting the `algorithm` property to `off`. It will still be able to decompress
|
||||||
payloads that were compressed when serialized, e.g. if this configuration is changed.
|
payloads that were compressed when serialized, e.g. if this configuration is changed.
|
||||||
|
|
||||||
|
For the `jackson-cbor` and custom bindings other than `jackson-json` compression is by default disabled,
|
||||||
|
but can be enabled in the same way as the configuration shown above but replacing `jackson-json` with
|
||||||
|
the binding name (for example `jackson-cbor`).
|
||||||
|
|
||||||
## Additional configuration
|
## Additional configuration
|
||||||
|
|
||||||
### Configuration per binding
|
### Configuration per binding
|
||||||
|
|
|
||||||
|
|
@ -22,14 +22,6 @@ akka.serialization.jackson {
|
||||||
}
|
}
|
||||||
#//#jackson-modules
|
#//#jackson-modules
|
||||||
|
|
||||||
#//#compression
|
|
||||||
akka.serialization.jackson {
|
|
||||||
# The serializer will compress the payload when it's larger than this value.
|
|
||||||
# Compression can be disabled with value 'off'.
|
|
||||||
compress-larger-than = 32 KiB
|
|
||||||
}
|
|
||||||
#//#compression
|
|
||||||
|
|
||||||
akka.serialization.jackson {
|
akka.serialization.jackson {
|
||||||
# When enabled and akka.loglevel=DEBUG serialization time and payload size
|
# When enabled and akka.loglevel=DEBUG serialization time and payload size
|
||||||
# is logged for each messages.
|
# is logged for each messages.
|
||||||
|
|
@ -140,6 +132,18 @@ akka.serialization.jackson {
|
||||||
# from `serialization-bindings`, but should still be possible to deserialize.
|
# from `serialization-bindings`, but should still be possible to deserialize.
|
||||||
whitelist-class-prefix = []
|
whitelist-class-prefix = []
|
||||||
|
|
||||||
|
# settings for compression of the payload
|
||||||
|
compression {
|
||||||
|
# Compression algorithm.
|
||||||
|
# - off : no compression
|
||||||
|
# - gzip : using common java gzip
|
||||||
|
algorithm = off
|
||||||
|
|
||||||
|
# If compression is enabled with the `algorithm` setting the payload is compressed
|
||||||
|
# when it's larger than this value.
|
||||||
|
compress-larger-than = 0 KiB
|
||||||
|
}
|
||||||
|
|
||||||
# Specific settings for jackson-json binding can be defined in this section to
|
# Specific settings for jackson-json binding can be defined in this section to
|
||||||
# override the settings in 'akka.serialization.jackson'
|
# override the settings in 'akka.serialization.jackson'
|
||||||
jackson-json {}
|
jackson-json {}
|
||||||
|
|
@ -151,6 +155,20 @@ akka.serialization.jackson {
|
||||||
}
|
}
|
||||||
#//#features
|
#//#features
|
||||||
|
|
||||||
|
#//#compression
|
||||||
|
# Compression settings for the jackson-json binding
|
||||||
|
akka.serialization.jackson.jackson-json.compression {
|
||||||
|
# Compression algorithm.
|
||||||
|
# - off : no compression
|
||||||
|
# - gzip : using common java gzip
|
||||||
|
algorithm = gzip
|
||||||
|
|
||||||
|
# If compression is enabled with the `algorithm` setting the payload is compressed
|
||||||
|
# when it's larger than this value.
|
||||||
|
compress-larger-than = 32 KiB
|
||||||
|
}
|
||||||
|
#//#compression
|
||||||
|
|
||||||
akka.actor {
|
akka.actor {
|
||||||
serializers {
|
serializers {
|
||||||
jackson-json = "akka.serialization.jackson.JacksonJsonSerializer"
|
jackson-json = "akka.serialization.jackson.JacksonJsonSerializer"
|
||||||
|
|
|
||||||
|
|
@ -80,6 +80,11 @@ import com.fasterxml.jackson.dataformat.cbor.CBORFactory
|
||||||
val disallowedSerializationBindings: Set[Class[_]] =
|
val disallowedSerializationBindings: Set[Class[_]] =
|
||||||
Set(classOf[java.io.Serializable], classOf[java.io.Serializable], classOf[java.lang.Comparable[_]])
|
Set(classOf[java.io.Serializable], classOf[java.io.Serializable], classOf[java.lang.Comparable[_]])
|
||||||
|
|
||||||
|
def isGZipped(bytes: Array[Byte]): Boolean = {
|
||||||
|
(bytes != null) && (bytes.length >= 2) &&
|
||||||
|
(bytes(0) == GZIPInputStream.GZIP_MAGIC.toByte) &&
|
||||||
|
(bytes(1) == (GZIPInputStream.GZIP_MAGIC >> 8).toByte)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -104,14 +109,21 @@ import com.fasterxml.jackson.dataformat.cbor.CBORFactory
|
||||||
bindingName,
|
bindingName,
|
||||||
JacksonObjectMapperProvider(system).getOrCreate(bindingName, Some(new CBORFactory)))
|
JacksonObjectMapperProvider(system).getOrCreate(bindingName, Some(new CBORFactory)))
|
||||||
|
|
||||||
|
@InternalApi object Compression {
|
||||||
|
sealed trait Algoritm
|
||||||
|
object Off extends Algoritm
|
||||||
|
final case class GZip(largerThan: Long) extends Algoritm
|
||||||
|
// TODO add LZ4, issue #27066
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* INTERNAL API: Base class for Jackson serializers.
|
* INTERNAL API: Base class for Jackson serializers.
|
||||||
*
|
*
|
||||||
* Configuration in `akka.serialization.jackson` section.
|
* Configuration in `akka.serialization.jackson` section.
|
||||||
* It will load Jackson modules defined in configuration `jackson-modules`.
|
* It will load Jackson modules defined in configuration `jackson-modules`.
|
||||||
*
|
*
|
||||||
* It will compress the payload if the the payload is larger than the configured
|
* It will compress the payload if the compression `algorithm` is enabled and the the
|
||||||
* `compress-larger-than` value.
|
* payload is larger than the configured `compress-larger-than` value.
|
||||||
*/
|
*/
|
||||||
@InternalApi private[akka] abstract class JacksonSerializer(
|
@InternalApi private[akka] abstract class JacksonSerializer(
|
||||||
val system: ExtendedActorSystem,
|
val system: ExtendedActorSystem,
|
||||||
|
|
@ -119,6 +131,7 @@ import com.fasterxml.jackson.dataformat.cbor.CBORFactory
|
||||||
val objectMapper: ObjectMapper)
|
val objectMapper: ObjectMapper)
|
||||||
extends SerializerWithStringManifest {
|
extends SerializerWithStringManifest {
|
||||||
import JacksonSerializer.GadgetClassBlacklist
|
import JacksonSerializer.GadgetClassBlacklist
|
||||||
|
import JacksonSerializer.isGZipped
|
||||||
|
|
||||||
// TODO issue #27107: it should be possible to implement ByteBufferSerializer as well, using Jackson's
|
// TODO issue #27107: it should be possible to implement ByteBufferSerializer as well, using Jackson's
|
||||||
// ByteBufferBackedOutputStream/ByteBufferBackedInputStream
|
// ByteBufferBackedOutputStream/ByteBufferBackedInputStream
|
||||||
|
|
@ -127,11 +140,16 @@ import com.fasterxml.jackson.dataformat.cbor.CBORFactory
|
||||||
private val conf = JacksonObjectMapperProvider.configForBinding(bindingName, system.settings.config)
|
private val conf = JacksonObjectMapperProvider.configForBinding(bindingName, system.settings.config)
|
||||||
private val isDebugEnabled = conf.getBoolean("verbose-debug-logging") && log.isDebugEnabled
|
private val isDebugEnabled = conf.getBoolean("verbose-debug-logging") && log.isDebugEnabled
|
||||||
private final val BufferSize = 1024 * 4
|
private final val BufferSize = 1024 * 4
|
||||||
private val compressLargerThan: Long = {
|
private val compressionAlgorithm: Compression.Algoritm = {
|
||||||
val key = "compress-larger-than"
|
toRootLowerCase(conf.getString("compression.algorithm")) match {
|
||||||
toRootLowerCase(conf.getString(key)) match {
|
case "off" => Compression.Off
|
||||||
case "off" => Long.MaxValue
|
case "gzip" =>
|
||||||
case _ => conf.getBytes(key)
|
val compressLargerThan = conf.getBytes("compression.compress-larger-than")
|
||||||
|
Compression.GZip(compressLargerThan)
|
||||||
|
case other =>
|
||||||
|
throw new IllegalArgumentException(
|
||||||
|
s"Unknown compression algorithm [$other], possible values are " +
|
||||||
|
""""off" or "gzip"""")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
private val migrations: Map[String, JacksonMigration] = {
|
private val migrations: Map[String, JacksonMigration] = {
|
||||||
|
|
@ -171,9 +189,7 @@ import com.fasterxml.jackson.dataformat.cbor.CBORFactory
|
||||||
checkAllowedSerializationBindings()
|
checkAllowedSerializationBindings()
|
||||||
val startTime = if (isDebugEnabled) System.nanoTime else 0L
|
val startTime = if (isDebugEnabled) System.nanoTime else 0L
|
||||||
val bytes = objectMapper.writeValueAsBytes(obj)
|
val bytes = objectMapper.writeValueAsBytes(obj)
|
||||||
val result =
|
val result = compress(bytes)
|
||||||
if (bytes.length > compressLargerThan) compress(bytes)
|
|
||||||
else bytes
|
|
||||||
|
|
||||||
logToBinaryDuration(obj, startTime, bytes, result)
|
logToBinaryDuration(obj, startTime, bytes, result)
|
||||||
|
|
||||||
|
|
@ -202,7 +218,6 @@ import com.fasterxml.jackson.dataformat.cbor.CBORFactory
|
||||||
override def fromBinary(bytes: Array[Byte], manifest: String): AnyRef = {
|
override def fromBinary(bytes: Array[Byte], manifest: String): AnyRef = {
|
||||||
checkAllowedSerializationBindings()
|
checkAllowedSerializationBindings()
|
||||||
val startTime = if (isDebugEnabled) System.nanoTime else 0L
|
val startTime = if (isDebugEnabled) System.nanoTime else 0L
|
||||||
val compressed = isGZipped(bytes)
|
|
||||||
|
|
||||||
val (fromVersion, manifestClassName) = parseManifest(manifest)
|
val (fromVersion, manifestClassName) = parseManifest(manifest)
|
||||||
checkAllowedClassName(manifestClassName)
|
checkAllowedClassName(manifestClassName)
|
||||||
|
|
@ -243,18 +258,18 @@ import com.fasterxml.jackson.dataformat.cbor.CBORFactory
|
||||||
}
|
}
|
||||||
checkAllowedClass(clazz)
|
checkAllowedClass(clazz)
|
||||||
|
|
||||||
val decompressBytes = if (compressed) decompress(bytes) else bytes
|
val decompressedBytes = decompress(bytes)
|
||||||
|
|
||||||
val result = migration match {
|
val result = migration match {
|
||||||
case Some(transformer) if fromVersion < transformer.currentVersion =>
|
case Some(transformer) if fromVersion < transformer.currentVersion =>
|
||||||
val jsonTree = objectMapper.readTree(decompressBytes)
|
val jsonTree = objectMapper.readTree(decompressedBytes)
|
||||||
val newJsonTree = transformer.transform(fromVersion, jsonTree)
|
val newJsonTree = transformer.transform(fromVersion, jsonTree)
|
||||||
objectMapper.treeToValue(newJsonTree, clazz)
|
objectMapper.treeToValue(newJsonTree, clazz)
|
||||||
case _ =>
|
case _ =>
|
||||||
objectMapper.readValue(decompressBytes, clazz)
|
objectMapper.readValue(decompressedBytes, clazz)
|
||||||
}
|
}
|
||||||
|
|
||||||
logFromBinaryDuration(bytes, decompressBytes, startTime, clazz)
|
logFromBinaryDuration(bytes, decompressedBytes, startTime, clazz)
|
||||||
|
|
||||||
result
|
result
|
||||||
|
|
||||||
|
|
@ -383,6 +398,14 @@ import com.fasterxml.jackson.dataformat.cbor.CBORFactory
|
||||||
}
|
}
|
||||||
|
|
||||||
def compress(bytes: Array[Byte]): Array[Byte] = {
|
def compress(bytes: Array[Byte]): Array[Byte] = {
|
||||||
|
compressionAlgorithm match {
|
||||||
|
case Compression.Off => bytes
|
||||||
|
case Compression.GZip(largerThan) =>
|
||||||
|
if (bytes.length > largerThan) compressGzip(bytes) else bytes
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private def compressGzip(bytes: Array[Byte]): Array[Byte] = {
|
||||||
val bos = new ByteArrayOutputStream(BufferSize)
|
val bos = new ByteArrayOutputStream(BufferSize)
|
||||||
val zip = new GZIPOutputStream(bos)
|
val zip = new GZIPOutputStream(bos)
|
||||||
try zip.write(bytes)
|
try zip.write(bytes)
|
||||||
|
|
@ -391,6 +414,13 @@ import com.fasterxml.jackson.dataformat.cbor.CBORFactory
|
||||||
}
|
}
|
||||||
|
|
||||||
def decompress(bytes: Array[Byte]): Array[Byte] = {
|
def decompress(bytes: Array[Byte]): Array[Byte] = {
|
||||||
|
if (isGZipped(bytes))
|
||||||
|
decompressGzip(bytes)
|
||||||
|
else
|
||||||
|
bytes
|
||||||
|
}
|
||||||
|
|
||||||
|
private def decompressGzip(bytes: Array[Byte]): Array[Byte] = {
|
||||||
val in = new GZIPInputStream(new ByteArrayInputStream(bytes))
|
val in = new GZIPInputStream(new ByteArrayInputStream(bytes))
|
||||||
val out = new ByteArrayOutputStream()
|
val out = new ByteArrayOutputStream()
|
||||||
val buffer = new Array[Byte](BufferSize)
|
val buffer = new Array[Byte](BufferSize)
|
||||||
|
|
@ -407,9 +437,4 @@ import com.fasterxml.jackson.dataformat.cbor.CBORFactory
|
||||||
out.toByteArray
|
out.toByteArray
|
||||||
}
|
}
|
||||||
|
|
||||||
def isGZipped(bytes: Array[Byte]): Boolean = {
|
|
||||||
(bytes != null) && (bytes.length >= 2) &&
|
|
||||||
(bytes(0) == GZIPInputStream.GZIP_MAGIC.toByte) &&
|
|
||||||
(bytes(1) == (GZIPInputStream.GZIP_MAGIC >> 8).toByte)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -110,7 +110,13 @@ class ScalaTestEventMigration extends JacksonMigration {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
class JacksonCborSerializerSpec extends JacksonSerializerSpec("jackson-cbor")
|
class JacksonCborSerializerSpec extends JacksonSerializerSpec("jackson-cbor") {
|
||||||
|
"have compression disabled by default" in {
|
||||||
|
val conf = JacksonObjectMapperProvider.configForBinding("jackson-cbor", system.settings.config)
|
||||||
|
val compressionAlgo = conf.getString("compression.algorithm")
|
||||||
|
compressionAlgo should ===("off")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@silent // this test uses Jackson deprecated APIs
|
@silent // this test uses Jackson deprecated APIs
|
||||||
class JacksonJsonSerializerSpec extends JacksonSerializerSpec("jackson-json") {
|
class JacksonJsonSerializerSpec extends JacksonSerializerSpec("jackson-json") {
|
||||||
|
|
@ -453,6 +459,24 @@ class JacksonJsonSerializerSpec extends JacksonSerializerSpec("jackson-json") {
|
||||||
|
|
||||||
deserializeFromJsonString(json, serializer.identifier, serializer.manifest(expected)) should ===(expected)
|
deserializeFromJsonString(json, serializer.identifier, serializer.manifest(expected)) should ===(expected)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
"compress large payload with gzip" in {
|
||||||
|
val conf = JacksonObjectMapperProvider.configForBinding("jackson-json", system.settings.config)
|
||||||
|
val compressionAlgo = conf.getString("compression.algorithm")
|
||||||
|
compressionAlgo should ===("gzip")
|
||||||
|
val compressLargerThan = conf.getBytes("compression.compress-larger-than")
|
||||||
|
compressLargerThan should ===(32 * 1024)
|
||||||
|
val msg = SimpleCommand("0" * (compressLargerThan + 1).toInt)
|
||||||
|
val bytes = serializeToBinary(msg)
|
||||||
|
JacksonSerializer.isGZipped(bytes) should ===(true)
|
||||||
|
bytes.length should be < compressLargerThan.toInt
|
||||||
|
}
|
||||||
|
|
||||||
|
"not compress small payload with gzip" in {
|
||||||
|
val msg = SimpleCommand("0" * 1000)
|
||||||
|
val bytes = serializeToBinary(msg)
|
||||||
|
JacksonSerializer.isGZipped(bytes) should ===(false)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue