From 11816df2367933580b99b88cc4ac66e0042d4fda Mon Sep 17 00:00:00 2001 From: contrun Date: Fri, 22 May 2020 04:37:22 -0700 Subject: [PATCH] add lz4 compression for jackson (#29012) --- .../jackson/JacksonSerializationBench.scala | 14 +- .../src/main/paradox/serialization-jackson.md | 4 +- .../src/main/resources/reference.conf | 1 + .../jackson/JacksonSerializer.scala | 140 ++++++++++++------ .../jackson/JacksonSerializerSpec.scala | 39 ++++- project/Dependencies.scala | 2 + 6 files changed, 152 insertions(+), 48 deletions(-) diff --git a/akka-bench-jmh/src/main/scala/akka/serialization/jackson/JacksonSerializationBench.scala b/akka-bench-jmh/src/main/scala/akka/serialization/jackson/JacksonSerializationBench.scala index 2289f214db..e11418911c 100644 --- a/akka-bench-jmh/src/main/scala/akka/serialization/jackson/JacksonSerializationBench.scala +++ b/akka-bench-jmh/src/main/scala/akka/serialization/jackson/JacksonSerializationBench.scala @@ -190,6 +190,10 @@ class JacksonSerializationBench { @Param(Array("jackson-json", "jackson-cbor")) // "java" private var serializerName: String = _ + @silent("immutable val") + @Param(Array("off", "gzip", "lz4")) + private var compression: String = _ + @Setup(Level.Trial) def setupTrial(): Unit = { val config = ConfigFactory.parseString(s""" @@ -208,7 +212,7 @@ class JacksonSerializationBench { } } akka.serialization.jackson.jackson-json.compression { - algorithm = off + algorithm = $compression compress-larger-than = 100 b } """) @@ -222,10 +226,18 @@ class JacksonSerializationBench { Await.result(system.terminate(), 5.seconds) } + private var size = 0L + private def serializeDeserialize[T <: AnyRef](msg: T): T = { serialization.findSerializerFor(msg) match { case serializer: SerializerWithStringManifest => val blob = serializer.toBinary(msg) + if (size != blob.length) { + size = blob.length + println( + s"# Size is $size of ${msg.getClass.getName} with " + + s"${system.settings.config.getString("akka.serialization.jackson.jackson-json.compression.algorithm")}") + } serializer.fromBinary(blob, serializer.manifest(msg)).asInstanceOf[T] case serializer => val blob = serializer.toBinary(msg) diff --git a/akka-docs/src/main/paradox/serialization-jackson.md b/akka-docs/src/main/paradox/serialization-jackson.md index df95ae5e46..775344ab92 100644 --- a/akka-docs/src/main/paradox/serialization-jackson.md +++ b/akka-docs/src/main/paradox/serialization-jackson.md @@ -377,7 +377,9 @@ the `jackson-json` binding the default configuration is: @@snip [reference.conf](/akka-serialization-jackson/src/main/resources/reference.conf) { #compression } -Messages larger than the `compress-larger-than` property are compressed with GZIP. +Supported compression algorithms are: gzip, lz4. Use 'off' to disable compression. +Gzip is generally slower than lz4. +Messages larger than the `compress-larger-than` property are compressed. Compression can be disabled by setting the `algorithm` property to `off`. It will still be able to decompress payloads that were compressed when serialized, e.g. if this configuration is changed. diff --git a/akka-serialization-jackson/src/main/resources/reference.conf b/akka-serialization-jackson/src/main/resources/reference.conf index 20c162ff5e..04df3d9279 100644 --- a/akka-serialization-jackson/src/main/resources/reference.conf +++ b/akka-serialization-jackson/src/main/resources/reference.conf @@ -188,6 +188,7 @@ akka.serialization.jackson.jackson-json.compression { # Compression algorithm. # - off : no compression # - gzip : using common java gzip + # - lz4 : using lz4-java algorithm = gzip # If compression is enabled with the `algorithm` setting the payload is compressed diff --git a/akka-serialization-jackson/src/main/scala/akka/serialization/jackson/JacksonSerializer.scala b/akka-serialization-jackson/src/main/scala/akka/serialization/jackson/JacksonSerializer.scala index c83e3d74fa..001adf0106 100644 --- a/akka-serialization-jackson/src/main/scala/akka/serialization/jackson/JacksonSerializer.scala +++ b/akka-serialization-jackson/src/main/scala/akka/serialization/jackson/JacksonSerializer.scala @@ -4,29 +4,25 @@ package akka.serialization.jackson -import java.io.ByteArrayInputStream -import java.io.ByteArrayOutputStream -import java.io.NotSerializableException -import java.util.zip.GZIPInputStream -import java.util.zip.GZIPOutputStream +import java.io.{ ByteArrayInputStream, ByteArrayOutputStream, NotSerializableException } +import java.nio.ByteBuffer +import java.util.zip.{ GZIPInputStream, GZIPOutputStream } import scala.annotation.tailrec -import scala.util.Failure -import scala.util.Success +import scala.util.{ Failure, Success } import scala.util.control.NonFatal import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.databind.jsontype.impl.SubTypeValidator import com.fasterxml.jackson.dataformat.cbor.CBORFactory +import net.jpountz.lz4.LZ4Factory import akka.actor.ExtendedActorSystem import akka.annotation.InternalApi -import akka.event.LogMarker -import akka.event.Logging -import akka.serialization.BaseSerializer -import akka.serialization.SerializationExtension -import akka.serialization.SerializerWithStringManifest +import akka.event.{ LogMarker, Logging } +import akka.serialization.{ BaseSerializer, SerializationExtension, SerializerWithStringManifest } import akka.util.Helpers.toRootLowerCase +import akka.util.OptionVal /** * INTERNAL API @@ -86,6 +82,51 @@ import akka.util.Helpers.toRootLowerCase (bytes(0) == GZIPInputStream.GZIP_MAGIC.toByte) && (bytes(1) == (GZIPInputStream.GZIP_MAGIC >> 8).toByte) } + + final case class LZ4Meta(offset: Int, length: Int) { + import LZ4Meta._ + + def putInto(buffer: ByteBuffer): Unit = { + buffer.putInt(LZ4_MAGIC) + buffer.putInt(length) + } + + def prependTo(bytes: Array[Byte]): Array[Byte] = { + val buffer = ByteBuffer.allocate(bytes.length + offset) + putInto(buffer) + buffer.put(bytes) + buffer.array() + } + + } + + object LZ4Meta { + val LZ4_MAGIC = 0x87d96df6 // The last 4 bytes of `printf akka | sha512sum` + + def apply(bytes: Array[Byte]): LZ4Meta = { + LZ4Meta(8, bytes.length) + } + + def get(buffer: ByteBuffer): OptionVal[LZ4Meta] = { + if (buffer.remaining() < 4) { + OptionVal.None + } else if (buffer.getInt() != LZ4_MAGIC) { + OptionVal.None + } else { + OptionVal.Some(LZ4Meta(8, buffer.getInt())) + } + } + + def get(bytes: Array[Byte]): OptionVal[LZ4Meta] = { + get(ByteBuffer.wrap(bytes)) + } + + } + + def isLZ4(bytes: Array[Byte]): Boolean = { + LZ4Meta.get(bytes).isDefined + } + } /** @@ -114,7 +155,7 @@ import akka.util.Helpers.toRootLowerCase sealed trait Algoritm object Off extends Algoritm final case class GZip(largerThan: Long) extends Algoritm - // TODO add LZ4, issue #27066 + final case class LZ4(largerThan: Long) extends Algoritm } /** @@ -131,8 +172,7 @@ import akka.util.Helpers.toRootLowerCase val bindingName: String, val objectMapper: ObjectMapper) extends SerializerWithStringManifest { - import JacksonSerializer.GadgetClassBlacklist - import JacksonSerializer.isGZipped + import JacksonSerializer._ // TODO issue #27107: it should be possible to implement ByteBufferSerializer as well, using Jackson's // ByteBufferBackedOutputStream/ByteBufferBackedInputStream @@ -147,6 +187,9 @@ import akka.util.Helpers.toRootLowerCase case "gzip" => val compressLargerThan = conf.getBytes("compression.compress-larger-than") Compression.GZip(compressLargerThan) + case "lz4" => + val compressLargerThan = conf.getBytes("compression.compress-larger-than") + Compression.LZ4(compressLargerThan) case other => throw new IllegalArgumentException( s"Unknown compression algorithm [$other], possible values are " + @@ -209,6 +252,10 @@ import akka.util.Helpers.toRootLowerCase // doesn't have to be volatile, doesn't matter if check is run more than once private var serializationBindingsCheckedOk = false + private lazy val lz4Factory = LZ4Factory.fastestInstance() + private lazy val lz4Compressor = lz4Factory.fastCompressor() + private lazy val lz4Decompressor = lz4Factory.safeDecompressor() + override val identifier: Int = BaseSerializer.identifierFromConfig(bindingName, system) override def manifest(obj: AnyRef): String = { @@ -446,42 +493,47 @@ import akka.util.Helpers.toRootLowerCase def compress(bytes: Array[Byte]): Array[Byte] = { compressionAlgorithm match { - case Compression.Off => bytes - case Compression.GZip(largerThan) => - if (bytes.length > largerThan) compressGzip(bytes) else bytes + case Compression.Off => bytes + case Compression.GZip(largerThan) if bytes.length <= largerThan => bytes + case Compression.GZip(_) => + val bos = new ByteArrayOutputStream(BufferSize) + val zip = new GZIPOutputStream(bos) + try zip.write(bytes) + finally zip.close() + bos.toByteArray + case Compression.LZ4(largerThan) if bytes.length <= largerThan => bytes + case Compression.LZ4(_) => { + val meta = LZ4Meta(bytes) + val compressed = lz4Compressor.compress(bytes) + meta.prependTo(compressed) + } } } - private def compressGzip(bytes: Array[Byte]): Array[Byte] = { - val bos = new ByteArrayOutputStream(BufferSize) - val zip = new GZIPOutputStream(bos) - try zip.write(bytes) - finally zip.close() - bos.toByteArray - } - def decompress(bytes: Array[Byte]): Array[Byte] = { - if (isGZipped(bytes)) - decompressGzip(bytes) - else - bytes - } + if (isGZipped(bytes)) { + val in = new GZIPInputStream(new ByteArrayInputStream(bytes)) + val out = new ByteArrayOutputStream() + val buffer = new Array[Byte](BufferSize) - private def decompressGzip(bytes: Array[Byte]): Array[Byte] = { - val in = new GZIPInputStream(new ByteArrayInputStream(bytes)) - val out = new ByteArrayOutputStream() - val buffer = new Array[Byte](BufferSize) + @tailrec def readChunk(): Unit = in.read(buffer) match { + case -1 => () + case n => + out.write(buffer, 0, n) + readChunk() + } - @tailrec def readChunk(): Unit = in.read(buffer) match { - case -1 => () - case n => - out.write(buffer, 0, n) - readChunk() + try readChunk() + finally in.close() + out.toByteArray + } else { + LZ4Meta.get(bytes) match { + case OptionVal.None => bytes + case OptionVal.Some(meta) => + val srcLen = bytes.length - meta.offset + lz4Decompressor.decompress(bytes, meta.offset, srcLen, meta.length) + } } - - try readChunk() - finally in.close() - out.toByteArray } } diff --git a/akka-serialization-jackson/src/test/scala/akka/serialization/jackson/JacksonSerializerSpec.scala b/akka-serialization-jackson/src/test/scala/akka/serialization/jackson/JacksonSerializerSpec.scala index 1b45c25c8a..a75ce1f4b5 100644 --- a/akka-serialization-jackson/src/test/scala/akka/serialization/jackson/JacksonSerializerSpec.scala +++ b/akka-serialization-jackson/src/test/scala/akka/serialization/jackson/JacksonSerializerSpec.scala @@ -508,6 +508,41 @@ class JacksonJsonSerializerSpec extends JacksonSerializerSpec("jackson-json") { val bytes = serializeToBinary(msg) JacksonSerializer.isGZipped(bytes) should ===(false) } + + "compress large payload with lz4" in withSystem(""" + akka.serialization.jackson.jackson-json.compression { + algorithm = lz4 + compress-larger-than = 32 KiB + } + """) { sys => + val conf = JacksonObjectMapperProvider.configForBinding("jackson-json", sys.settings.config) + val compressLargerThan = conf.getBytes("compression.compress-larger-than") + def check(msg: AnyRef, compressed: Boolean): Unit = { + val bytes = serializeToBinary(msg, sys) + JacksonSerializer.isLZ4(bytes) should ===(compressed) + bytes.length should be < compressLargerThan.toInt + checkSerialization(msg, sys) + } + check(SimpleCommand("0" * (compressLargerThan + 1).toInt), true) + } + + "not compress small payload with lz4" in withSystem(""" + akka.serialization.jackson.jackson-json.compression { + algorithm = lz4 + compress-larger-than = 32 KiB + } + """) { sys => + val conf = JacksonObjectMapperProvider.configForBinding("jackson-json", sys.settings.config) + val compressLargerThan = conf.getBytes("compression.compress-larger-than") + def check(msg: AnyRef, compressed: Boolean): Unit = { + val bytes = serializeToBinary(msg, sys) + JacksonSerializer.isLZ4(bytes) should ===(compressed) + bytes.length should be < compressLargerThan.toInt + checkSerialization(msg, sys) + } + check(SimpleCommand("Bob"), false) + check(new SimpleCommandNotCaseClass("Bob"), false) + } } "JacksonJsonSerializer without type in manifest" should { @@ -639,13 +674,13 @@ abstract class JacksonSerializerSpec(serializerName: String) val serializer = serializerFor(obj, sys) val manifest = serializer.manifest(obj) val serializerId = serializer.identifier - val blob = serializeToBinary(obj) + val blob = serializeToBinary(obj, sys) // Issue #28918, check that CBOR format is used (not JSON). if (blob.length > 0) { serializer match { case _: JacksonJsonSerializer => - if (!JacksonSerializer.isGZipped(blob)) + if (!JacksonSerializer.isGZipped(blob) && !JacksonSerializer.isLZ4(blob)) new String(blob.take(1), StandardCharsets.UTF_8) should ===("{") case _: JacksonCborSerializer => new String(blob.take(1), StandardCharsets.UTF_8) should !==("{") diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 0ed39976d8..93c05ba38a 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -93,6 +93,7 @@ object Dependencies { val jacksonScala = "com.fasterxml.jackson.module" %% "jackson-module-scala" % jacksonVersion // ApacheV2 val jacksonParameterNames = "com.fasterxml.jackson.module" % "jackson-module-parameter-names" % jacksonVersion // ApacheV2 val jacksonCbor = "com.fasterxml.jackson.dataformat" % "jackson-dataformat-cbor" % jacksonVersion // ApacheV2 + val lz4Java = "org.lz4" % "lz4-java" % "1.7.1" // ApacheV2 val logback = "ch.qos.logback" % "logback-classic" % logbackVersion // EPL 1.0 @@ -265,6 +266,7 @@ object Dependencies { jacksonJsr310, jacksonParameterNames, jacksonCbor, + lz4Java, Test.junit, Test.scalatest)