add lz4 compression for jackson (#29012)

This commit is contained in:
contrun 2020-05-22 04:37:22 -07:00 committed by GitHub
parent 6a07c4f809
commit 11816df236
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 152 additions and 48 deletions

View file

@ -190,6 +190,10 @@ class JacksonSerializationBench {
@Param(Array("jackson-json", "jackson-cbor")) // "java"
private var serializerName: String = _
@silent("immutable val")
@Param(Array("off", "gzip", "lz4"))
private var compression: String = _
@Setup(Level.Trial)
def setupTrial(): Unit = {
val config = ConfigFactory.parseString(s"""
@ -208,7 +212,7 @@ class JacksonSerializationBench {
}
}
akka.serialization.jackson.jackson-json.compression {
algorithm = off
algorithm = $compression
compress-larger-than = 100 b
}
""")
@ -222,10 +226,18 @@ class JacksonSerializationBench {
Await.result(system.terminate(), 5.seconds)
}
private var size = 0L
private def serializeDeserialize[T <: AnyRef](msg: T): T = {
serialization.findSerializerFor(msg) match {
case serializer: SerializerWithStringManifest =>
val blob = serializer.toBinary(msg)
if (size != blob.length) {
size = blob.length
println(
s"# Size is $size of ${msg.getClass.getName} with " +
s"${system.settings.config.getString("akka.serialization.jackson.jackson-json.compression.algorithm")}")
}
serializer.fromBinary(blob, serializer.manifest(msg)).asInstanceOf[T]
case serializer =>
val blob = serializer.toBinary(msg)

View file

@ -377,7 +377,9 @@ the `jackson-json` binding the default configuration is:
@@snip [reference.conf](/akka-serialization-jackson/src/main/resources/reference.conf) { #compression }
Messages larger than the `compress-larger-than` property are compressed with GZIP.
Supported compression algorithms are: gzip, lz4. Use 'off' to disable compression.
Gzip is generally slower than lz4.
Messages larger than the `compress-larger-than` property are compressed.
Compression can be disabled by setting the `algorithm` property to `off`. It will still be able to decompress
payloads that were compressed when serialized, e.g. if this configuration is changed.

View file

@ -188,6 +188,7 @@ akka.serialization.jackson.jackson-json.compression {
# Compression algorithm.
# - off : no compression
# - gzip : using common java gzip
# - lz4 : using lz4-java
algorithm = gzip
# If compression is enabled with the `algorithm` setting the payload is compressed

View file

@ -4,29 +4,25 @@
package akka.serialization.jackson
import java.io.ByteArrayInputStream
import java.io.ByteArrayOutputStream
import java.io.NotSerializableException
import java.util.zip.GZIPInputStream
import java.util.zip.GZIPOutputStream
import java.io.{ ByteArrayInputStream, ByteArrayOutputStream, NotSerializableException }
import java.nio.ByteBuffer
import java.util.zip.{ GZIPInputStream, GZIPOutputStream }
import scala.annotation.tailrec
import scala.util.Failure
import scala.util.Success
import scala.util.{ Failure, Success }
import scala.util.control.NonFatal
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.databind.jsontype.impl.SubTypeValidator
import com.fasterxml.jackson.dataformat.cbor.CBORFactory
import net.jpountz.lz4.LZ4Factory
import akka.actor.ExtendedActorSystem
import akka.annotation.InternalApi
import akka.event.LogMarker
import akka.event.Logging
import akka.serialization.BaseSerializer
import akka.serialization.SerializationExtension
import akka.serialization.SerializerWithStringManifest
import akka.event.{ LogMarker, Logging }
import akka.serialization.{ BaseSerializer, SerializationExtension, SerializerWithStringManifest }
import akka.util.Helpers.toRootLowerCase
import akka.util.OptionVal
/**
* INTERNAL API
@ -86,6 +82,51 @@ import akka.util.Helpers.toRootLowerCase
(bytes(0) == GZIPInputStream.GZIP_MAGIC.toByte) &&
(bytes(1) == (GZIPInputStream.GZIP_MAGIC >> 8).toByte)
}
final case class LZ4Meta(offset: Int, length: Int) {
import LZ4Meta._
def putInto(buffer: ByteBuffer): Unit = {
buffer.putInt(LZ4_MAGIC)
buffer.putInt(length)
}
def prependTo(bytes: Array[Byte]): Array[Byte] = {
val buffer = ByteBuffer.allocate(bytes.length + offset)
putInto(buffer)
buffer.put(bytes)
buffer.array()
}
}
object LZ4Meta {
val LZ4_MAGIC = 0x87d96df6 // The last 4 bytes of `printf akka | sha512sum`
def apply(bytes: Array[Byte]): LZ4Meta = {
LZ4Meta(8, bytes.length)
}
def get(buffer: ByteBuffer): OptionVal[LZ4Meta] = {
if (buffer.remaining() < 4) {
OptionVal.None
} else if (buffer.getInt() != LZ4_MAGIC) {
OptionVal.None
} else {
OptionVal.Some(LZ4Meta(8, buffer.getInt()))
}
}
def get(bytes: Array[Byte]): OptionVal[LZ4Meta] = {
get(ByteBuffer.wrap(bytes))
}
}
def isLZ4(bytes: Array[Byte]): Boolean = {
LZ4Meta.get(bytes).isDefined
}
}
/**
@ -114,7 +155,7 @@ import akka.util.Helpers.toRootLowerCase
sealed trait Algoritm
object Off extends Algoritm
final case class GZip(largerThan: Long) extends Algoritm
// TODO add LZ4, issue #27066
final case class LZ4(largerThan: Long) extends Algoritm
}
/**
@ -131,8 +172,7 @@ import akka.util.Helpers.toRootLowerCase
val bindingName: String,
val objectMapper: ObjectMapper)
extends SerializerWithStringManifest {
import JacksonSerializer.GadgetClassBlacklist
import JacksonSerializer.isGZipped
import JacksonSerializer._
// TODO issue #27107: it should be possible to implement ByteBufferSerializer as well, using Jackson's
// ByteBufferBackedOutputStream/ByteBufferBackedInputStream
@ -147,6 +187,9 @@ import akka.util.Helpers.toRootLowerCase
case "gzip" =>
val compressLargerThan = conf.getBytes("compression.compress-larger-than")
Compression.GZip(compressLargerThan)
case "lz4" =>
val compressLargerThan = conf.getBytes("compression.compress-larger-than")
Compression.LZ4(compressLargerThan)
case other =>
throw new IllegalArgumentException(
s"Unknown compression algorithm [$other], possible values are " +
@ -209,6 +252,10 @@ import akka.util.Helpers.toRootLowerCase
// doesn't have to be volatile, doesn't matter if check is run more than once
private var serializationBindingsCheckedOk = false
private lazy val lz4Factory = LZ4Factory.fastestInstance()
private lazy val lz4Compressor = lz4Factory.fastCompressor()
private lazy val lz4Decompressor = lz4Factory.safeDecompressor()
override val identifier: Int = BaseSerializer.identifierFromConfig(bindingName, system)
override def manifest(obj: AnyRef): String = {
@ -446,42 +493,47 @@ import akka.util.Helpers.toRootLowerCase
def compress(bytes: Array[Byte]): Array[Byte] = {
compressionAlgorithm match {
case Compression.Off => bytes
case Compression.GZip(largerThan) =>
if (bytes.length > largerThan) compressGzip(bytes) else bytes
case Compression.Off => bytes
case Compression.GZip(largerThan) if bytes.length <= largerThan => bytes
case Compression.GZip(_) =>
val bos = new ByteArrayOutputStream(BufferSize)
val zip = new GZIPOutputStream(bos)
try zip.write(bytes)
finally zip.close()
bos.toByteArray
case Compression.LZ4(largerThan) if bytes.length <= largerThan => bytes
case Compression.LZ4(_) => {
val meta = LZ4Meta(bytes)
val compressed = lz4Compressor.compress(bytes)
meta.prependTo(compressed)
}
}
}
private def compressGzip(bytes: Array[Byte]): Array[Byte] = {
val bos = new ByteArrayOutputStream(BufferSize)
val zip = new GZIPOutputStream(bos)
try zip.write(bytes)
finally zip.close()
bos.toByteArray
}
def decompress(bytes: Array[Byte]): Array[Byte] = {
if (isGZipped(bytes))
decompressGzip(bytes)
else
bytes
}
if (isGZipped(bytes)) {
val in = new GZIPInputStream(new ByteArrayInputStream(bytes))
val out = new ByteArrayOutputStream()
val buffer = new Array[Byte](BufferSize)
private def decompressGzip(bytes: Array[Byte]): Array[Byte] = {
val in = new GZIPInputStream(new ByteArrayInputStream(bytes))
val out = new ByteArrayOutputStream()
val buffer = new Array[Byte](BufferSize)
@tailrec def readChunk(): Unit = in.read(buffer) match {
case -1 => ()
case n =>
out.write(buffer, 0, n)
readChunk()
}
@tailrec def readChunk(): Unit = in.read(buffer) match {
case -1 => ()
case n =>
out.write(buffer, 0, n)
readChunk()
try readChunk()
finally in.close()
out.toByteArray
} else {
LZ4Meta.get(bytes) match {
case OptionVal.None => bytes
case OptionVal.Some(meta) =>
val srcLen = bytes.length - meta.offset
lz4Decompressor.decompress(bytes, meta.offset, srcLen, meta.length)
}
}
try readChunk()
finally in.close()
out.toByteArray
}
}

View file

@ -508,6 +508,41 @@ class JacksonJsonSerializerSpec extends JacksonSerializerSpec("jackson-json") {
val bytes = serializeToBinary(msg)
JacksonSerializer.isGZipped(bytes) should ===(false)
}
"compress large payload with lz4" in withSystem("""
akka.serialization.jackson.jackson-json.compression {
algorithm = lz4
compress-larger-than = 32 KiB
}
""") { sys =>
val conf = JacksonObjectMapperProvider.configForBinding("jackson-json", sys.settings.config)
val compressLargerThan = conf.getBytes("compression.compress-larger-than")
def check(msg: AnyRef, compressed: Boolean): Unit = {
val bytes = serializeToBinary(msg, sys)
JacksonSerializer.isLZ4(bytes) should ===(compressed)
bytes.length should be < compressLargerThan.toInt
checkSerialization(msg, sys)
}
check(SimpleCommand("0" * (compressLargerThan + 1).toInt), true)
}
"not compress small payload with lz4" in withSystem("""
akka.serialization.jackson.jackson-json.compression {
algorithm = lz4
compress-larger-than = 32 KiB
}
""") { sys =>
val conf = JacksonObjectMapperProvider.configForBinding("jackson-json", sys.settings.config)
val compressLargerThan = conf.getBytes("compression.compress-larger-than")
def check(msg: AnyRef, compressed: Boolean): Unit = {
val bytes = serializeToBinary(msg, sys)
JacksonSerializer.isLZ4(bytes) should ===(compressed)
bytes.length should be < compressLargerThan.toInt
checkSerialization(msg, sys)
}
check(SimpleCommand("Bob"), false)
check(new SimpleCommandNotCaseClass("Bob"), false)
}
}
"JacksonJsonSerializer without type in manifest" should {
@ -639,13 +674,13 @@ abstract class JacksonSerializerSpec(serializerName: String)
val serializer = serializerFor(obj, sys)
val manifest = serializer.manifest(obj)
val serializerId = serializer.identifier
val blob = serializeToBinary(obj)
val blob = serializeToBinary(obj, sys)
// Issue #28918, check that CBOR format is used (not JSON).
if (blob.length > 0) {
serializer match {
case _: JacksonJsonSerializer =>
if (!JacksonSerializer.isGZipped(blob))
if (!JacksonSerializer.isGZipped(blob) && !JacksonSerializer.isLZ4(blob))
new String(blob.take(1), StandardCharsets.UTF_8) should ===("{")
case _: JacksonCborSerializer =>
new String(blob.take(1), StandardCharsets.UTF_8) should !==("{")

View file

@ -93,6 +93,7 @@ object Dependencies {
val jacksonScala = "com.fasterxml.jackson.module" %% "jackson-module-scala" % jacksonVersion // ApacheV2
val jacksonParameterNames = "com.fasterxml.jackson.module" % "jackson-module-parameter-names" % jacksonVersion // ApacheV2
val jacksonCbor = "com.fasterxml.jackson.dataformat" % "jackson-dataformat-cbor" % jacksonVersion // ApacheV2
val lz4Java = "org.lz4" % "lz4-java" % "1.7.1" // ApacheV2
val logback = "ch.qos.logback" % "logback-classic" % logbackVersion // EPL 1.0
@ -265,6 +266,7 @@ object Dependencies {
jacksonJsr310,
jacksonParameterNames,
jacksonCbor,
lz4Java,
Test.junit,
Test.scalatest)