diff --git a/akka-bench-jmh/src/main/scala/akka/serialization/jackson/JacksonSerializationBench.scala b/akka-bench-jmh/src/main/scala/akka/serialization/jackson/JacksonSerializationBench.scala index 2c720b51d6..7f55e2e3cc 100644 --- a/akka-bench-jmh/src/main/scala/akka/serialization/jackson/JacksonSerializationBench.scala +++ b/akka-bench-jmh/src/main/scala/akka/serialization/jackson/JacksonSerializationBench.scala @@ -201,13 +201,15 @@ class JacksonSerializationBench { } } serialization.jackson { - compress-larger-than = 100000 b - serialization-features { #WRITE_DATES_AS_TIMESTAMPS = off } } } + akka.serialization.jackson.jackson-json.compression { + algorithm = off + compress-larger-than = 100 b + } """) system = ActorSystem("JacksonSerializationBench", config) diff --git a/akka-docs/src/main/paradox/serialization-jackson.md b/akka-docs/src/main/paradox/serialization-jackson.md index 0a01bdb0d2..5e7b333521 100644 --- a/akka-docs/src/main/paradox/serialization-jackson.md +++ b/akka-docs/src/main/paradox/serialization-jackson.md @@ -362,14 +362,20 @@ Java compiler option is enabled. ### Compression -JSON can be rather verbose and for large messages it can be beneficial compress large payloads. Messages larger -than the following configuration are compressed with GZIP. +JSON can be rather verbose and for large messages it can be beneficial to compress large payloads. For +the `jackson-json` binding the default configuration is: @@snip [reference.conf](/akka-serialization-jackson/src/main/resources/reference.conf) { #compression } -Compression can be disabled by setting this configuration property to `off`. It will still be able to decompress +Messages larger than the `compress-larger-than` property are compressed with GZIP. + +Compression can be disabled by setting the `algorithm` property to `off`. It will still be able to decompress payloads that were compressed when serialized, e.g. if this configuration is changed. +For the `jackson-cbor` and custom bindings other than `jackson-json` compression is by default disabled, +but can be enabled in the same way as the configuration shown above but replacing `jackson-json` with +the binding name (for example `jackson-cbor`). + ## Additional configuration ### Configuration per binding diff --git a/akka-serialization-jackson/src/main/resources/reference.conf b/akka-serialization-jackson/src/main/resources/reference.conf index f6ad03db2d..3b4c5d6174 100644 --- a/akka-serialization-jackson/src/main/resources/reference.conf +++ b/akka-serialization-jackson/src/main/resources/reference.conf @@ -22,14 +22,6 @@ akka.serialization.jackson { } #//#jackson-modules -#//#compression -akka.serialization.jackson { - # The serializer will compress the payload when it's larger than this value. - # Compression can be disabled with value 'off'. - compress-larger-than = 32 KiB -} -#//#compression - akka.serialization.jackson { # When enabled and akka.loglevel=DEBUG serialization time and payload size # is logged for each messages. @@ -140,6 +132,18 @@ akka.serialization.jackson { # from `serialization-bindings`, but should still be possible to deserialize. whitelist-class-prefix = [] + # settings for compression of the payload + compression { + # Compression algorithm. + # - off : no compression + # - gzip : using common java gzip + algorithm = off + + # If compression is enabled with the `algorithm` setting the payload is compressed + # when it's larger than this value. + compress-larger-than = 0 KiB + } + # Specific settings for jackson-json binding can be defined in this section to # override the settings in 'akka.serialization.jackson' jackson-json {} @@ -151,6 +155,20 @@ akka.serialization.jackson { } #//#features +#//#compression +# Compression settings for the jackson-json binding +akka.serialization.jackson.jackson-json.compression { + # Compression algorithm. + # - off : no compression + # - gzip : using common java gzip + algorithm = gzip + + # If compression is enabled with the `algorithm` setting the payload is compressed + # when it's larger than this value. + compress-larger-than = 32 KiB +} +#//#compression + akka.actor { serializers { jackson-json = "akka.serialization.jackson.JacksonJsonSerializer" diff --git a/akka-serialization-jackson/src/main/scala/akka/serialization/jackson/JacksonSerializer.scala b/akka-serialization-jackson/src/main/scala/akka/serialization/jackson/JacksonSerializer.scala index 5ffadc2012..46d1fea755 100644 --- a/akka-serialization-jackson/src/main/scala/akka/serialization/jackson/JacksonSerializer.scala +++ b/akka-serialization-jackson/src/main/scala/akka/serialization/jackson/JacksonSerializer.scala @@ -80,6 +80,11 @@ import com.fasterxml.jackson.dataformat.cbor.CBORFactory val disallowedSerializationBindings: Set[Class[_]] = Set(classOf[java.io.Serializable], classOf[java.io.Serializable], classOf[java.lang.Comparable[_]]) + def isGZipped(bytes: Array[Byte]): Boolean = { + (bytes != null) && (bytes.length >= 2) && + (bytes(0) == GZIPInputStream.GZIP_MAGIC.toByte) && + (bytes(1) == (GZIPInputStream.GZIP_MAGIC >> 8).toByte) + } } /** @@ -104,14 +109,21 @@ import com.fasterxml.jackson.dataformat.cbor.CBORFactory bindingName, JacksonObjectMapperProvider(system).getOrCreate(bindingName, Some(new CBORFactory))) +@InternalApi object Compression { + sealed trait Algoritm + object Off extends Algoritm + final case class GZip(largerThan: Long) extends Algoritm + // TODO add LZ4, issue #27066 +} + /** * INTERNAL API: Base class for Jackson serializers. * * Configuration in `akka.serialization.jackson` section. * It will load Jackson modules defined in configuration `jackson-modules`. * - * It will compress the payload if the the payload is larger than the configured - * `compress-larger-than` value. + * It will compress the payload if the compression `algorithm` is enabled and the the + * payload is larger than the configured `compress-larger-than` value. */ @InternalApi private[akka] abstract class JacksonSerializer( val system: ExtendedActorSystem, @@ -119,6 +131,7 @@ import com.fasterxml.jackson.dataformat.cbor.CBORFactory val objectMapper: ObjectMapper) extends SerializerWithStringManifest { import JacksonSerializer.GadgetClassBlacklist + import JacksonSerializer.isGZipped // TODO issue #27107: it should be possible to implement ByteBufferSerializer as well, using Jackson's // ByteBufferBackedOutputStream/ByteBufferBackedInputStream @@ -127,11 +140,16 @@ import com.fasterxml.jackson.dataformat.cbor.CBORFactory private val conf = JacksonObjectMapperProvider.configForBinding(bindingName, system.settings.config) private val isDebugEnabled = conf.getBoolean("verbose-debug-logging") && log.isDebugEnabled private final val BufferSize = 1024 * 4 - private val compressLargerThan: Long = { - val key = "compress-larger-than" - toRootLowerCase(conf.getString(key)) match { - case "off" => Long.MaxValue - case _ => conf.getBytes(key) + private val compressionAlgorithm: Compression.Algoritm = { + toRootLowerCase(conf.getString("compression.algorithm")) match { + case "off" => Compression.Off + case "gzip" => + val compressLargerThan = conf.getBytes("compression.compress-larger-than") + Compression.GZip(compressLargerThan) + case other => + throw new IllegalArgumentException( + s"Unknown compression algorithm [$other], possible values are " + + """"off" or "gzip"""") } } private val migrations: Map[String, JacksonMigration] = { @@ -171,9 +189,7 @@ import com.fasterxml.jackson.dataformat.cbor.CBORFactory checkAllowedSerializationBindings() val startTime = if (isDebugEnabled) System.nanoTime else 0L val bytes = objectMapper.writeValueAsBytes(obj) - val result = - if (bytes.length > compressLargerThan) compress(bytes) - else bytes + val result = compress(bytes) logToBinaryDuration(obj, startTime, bytes, result) @@ -202,7 +218,6 @@ import com.fasterxml.jackson.dataformat.cbor.CBORFactory override def fromBinary(bytes: Array[Byte], manifest: String): AnyRef = { checkAllowedSerializationBindings() val startTime = if (isDebugEnabled) System.nanoTime else 0L - val compressed = isGZipped(bytes) val (fromVersion, manifestClassName) = parseManifest(manifest) checkAllowedClassName(manifestClassName) @@ -243,18 +258,18 @@ import com.fasterxml.jackson.dataformat.cbor.CBORFactory } checkAllowedClass(clazz) - val decompressBytes = if (compressed) decompress(bytes) else bytes + val decompressedBytes = decompress(bytes) val result = migration match { case Some(transformer) if fromVersion < transformer.currentVersion => - val jsonTree = objectMapper.readTree(decompressBytes) + val jsonTree = objectMapper.readTree(decompressedBytes) val newJsonTree = transformer.transform(fromVersion, jsonTree) objectMapper.treeToValue(newJsonTree, clazz) case _ => - objectMapper.readValue(decompressBytes, clazz) + objectMapper.readValue(decompressedBytes, clazz) } - logFromBinaryDuration(bytes, decompressBytes, startTime, clazz) + logFromBinaryDuration(bytes, decompressedBytes, startTime, clazz) result @@ -383,6 +398,14 @@ import com.fasterxml.jackson.dataformat.cbor.CBORFactory } def compress(bytes: Array[Byte]): Array[Byte] = { + compressionAlgorithm match { + case Compression.Off => bytes + case Compression.GZip(largerThan) => + if (bytes.length > largerThan) compressGzip(bytes) else bytes + } + } + + private def compressGzip(bytes: Array[Byte]): Array[Byte] = { val bos = new ByteArrayOutputStream(BufferSize) val zip = new GZIPOutputStream(bos) try zip.write(bytes) @@ -391,6 +414,13 @@ import com.fasterxml.jackson.dataformat.cbor.CBORFactory } def decompress(bytes: Array[Byte]): Array[Byte] = { + if (isGZipped(bytes)) + decompressGzip(bytes) + else + bytes + } + + private def decompressGzip(bytes: Array[Byte]): Array[Byte] = { val in = new GZIPInputStream(new ByteArrayInputStream(bytes)) val out = new ByteArrayOutputStream() val buffer = new Array[Byte](BufferSize) @@ -407,9 +437,4 @@ import com.fasterxml.jackson.dataformat.cbor.CBORFactory out.toByteArray } - def isGZipped(bytes: Array[Byte]): Boolean = { - (bytes != null) && (bytes.length >= 2) && - (bytes(0) == GZIPInputStream.GZIP_MAGIC.toByte) && - (bytes(1) == (GZIPInputStream.GZIP_MAGIC >> 8).toByte) - } } diff --git a/akka-serialization-jackson/src/test/scala/akka/serialization/jackson/JacksonSerializerSpec.scala b/akka-serialization-jackson/src/test/scala/akka/serialization/jackson/JacksonSerializerSpec.scala index b0c8f7e042..94cc93866c 100644 --- a/akka-serialization-jackson/src/test/scala/akka/serialization/jackson/JacksonSerializerSpec.scala +++ b/akka-serialization-jackson/src/test/scala/akka/serialization/jackson/JacksonSerializerSpec.scala @@ -110,7 +110,13 @@ class ScalaTestEventMigration extends JacksonMigration { } } -class JacksonCborSerializerSpec extends JacksonSerializerSpec("jackson-cbor") +class JacksonCborSerializerSpec extends JacksonSerializerSpec("jackson-cbor") { + "have compression disabled by default" in { + val conf = JacksonObjectMapperProvider.configForBinding("jackson-cbor", system.settings.config) + val compressionAlgo = conf.getString("compression.algorithm") + compressionAlgo should ===("off") + } +} @silent // this test uses Jackson deprecated APIs class JacksonJsonSerializerSpec extends JacksonSerializerSpec("jackson-json") { @@ -453,6 +459,24 @@ class JacksonJsonSerializerSpec extends JacksonSerializerSpec("jackson-json") { deserializeFromJsonString(json, serializer.identifier, serializer.manifest(expected)) should ===(expected) } + + "compress large payload with gzip" in { + val conf = JacksonObjectMapperProvider.configForBinding("jackson-json", system.settings.config) + val compressionAlgo = conf.getString("compression.algorithm") + compressionAlgo should ===("gzip") + val compressLargerThan = conf.getBytes("compression.compress-larger-than") + compressLargerThan should ===(32 * 1024) + val msg = SimpleCommand("0" * (compressLargerThan + 1).toInt) + val bytes = serializeToBinary(msg) + JacksonSerializer.isGZipped(bytes) should ===(true) + bytes.length should be < compressLargerThan.toInt + } + + "not compress small payload with gzip" in { + val msg = SimpleCommand("0" * 1000) + val bytes = serializeToBinary(msg) + JacksonSerializer.isGZipped(bytes) should ===(false) + } } }