Move (de)compression helpers to akka-stream module #21395 (#21409)

* Move (de)compression helpers to akka-stream #21395

* Move compression and decompression -related classes from
  akka-http-experimental to akka-stream
* Add Compression helper object with functions to create
  decompressing Flows
* Add a short cookbook entry

* =str move compression impl classes into their own directory (and change visibility)

* =str also expose gzip/deflate compression flows

* Fix formatting of plural ByteStrings in cookbook

* =str #21395 make compressor call Deflater.end in postStop to release resources

Also simplified the creation of the flow given a compressor.

* =str #21395 decompressors call Inflater.end in postStop to release resources

* =str #21395 smallish Scaladoc fixes
This commit is contained in:
Olli Helenius 2016-11-17 23:42:37 +02:00 committed by Konrad Malawski
parent d7af58bafa
commit 658b46e1cc
14 changed files with 618 additions and 0 deletions

View file

@ -0,0 +1,68 @@
/**
* Copyright (C) 2016 Lightbend Inc. <http://www.lightbend.com/>
*/
package docs.stream.javadsl.cookbook;
import akka.NotUsed;
import akka.actor.ActorSystem;
import akka.stream.ActorMaterializer;
import akka.stream.Materializer;
import akka.stream.javadsl.Compression;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
import akka.testkit.JavaTestKit;
import akka.util.ByteString;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;
import java.util.zip.GZIPOutputStream;
public class RecipeDecompress extends RecipeTest {
static ActorSystem system;
static Materializer mat;
@BeforeClass
public static void setup() {
system = ActorSystem.create("RecipeDecompress");
mat = ActorMaterializer.create(system);
}
@AfterClass
public static void tearDown() {
JavaTestKit.shutdownActorSystem(system);
system = null;
mat = null;
}
private ByteString gzip(final String s) throws IOException {
final ByteArrayOutputStream buf = new ByteArrayOutputStream();
final GZIPOutputStream out = new GZIPOutputStream(buf);
try {
out.write(s.getBytes(StandardCharsets.UTF_8));
} finally {
out.close();
}
return ByteString.fromArray(buf.toByteArray());
}
@Test
public void parseLines() throws Exception {
final Source<ByteString, NotUsed> compressed = Source.single(gzip("Hello World"));
//#decompress-gzip
final Source<String, NotUsed> uncompressed = compressed
.via(Compression.gunzip(100))
.map(b -> b.utf8String());
//#decompress-gzip
uncompressed.runWith(Sink.head(), mat).toCompletableFuture().get(1, TimeUnit.SECONDS);
}
}

View file

@ -102,6 +102,17 @@ The :class:`Framing` helper class contains a convenience method to parse message
.. includecode:: ../code/docs/stream/javadsl/cookbook/RecipeParseLines.java#parse-lines .. includecode:: ../code/docs/stream/javadsl/cookbook/RecipeParseLines.java#parse-lines
Dealing with compressed data streams
------------------------------------
**Situation:** A gzipped stream of bytes is given as a stream of ``ByteString`` s, for example from a ``FileIO`` source.
The :class:`Compression` helper class contains convenience methods for decompressing data streams compressed with
Gzip or Deflate.
.. includecode:: ../code/docs/stream/javadsl/cookbook/RecipeDecompress.java#decompress-gzip
Implementing reduce-by-key Implementing reduce-by-key
-------------------------- --------------------------

View file

@ -0,0 +1,40 @@
/**
* Copyright (C) 2016 Lightbend Inc. <http://www.lightbend.com/>
*/
package docs.stream.cookbook
import java.io.ByteArrayOutputStream
import java.nio.charset.StandardCharsets
import java.util.zip.GZIPOutputStream
import akka.stream.impl.io.compression.GzipCompressor
import akka.stream.scaladsl.Sink
import akka.stream.scaladsl.Source
import akka.util.ByteString
import scala.annotation.tailrec
import scala.concurrent.Await
import scala.concurrent.duration._
class RecipeDecompress extends RecipeSpec {
def gzip(s: String): ByteString = {
val buf = new ByteArrayOutputStream()
val out = new GZIPOutputStream(buf)
try out.write(s.getBytes(StandardCharsets.UTF_8)) finally out.close()
ByteString(buf.toByteArray)
}
"Recipe for decompressing a Gzip stream" must {
"work" in {
val compressed = Source.single(gzip("Hello World"))
//#decompress-gzip
import akka.stream.scaladsl.Compression
val uncompressed = compressed.via(Compression.gunzip())
.map(_.utf8String)
//#decompress-gzip
Await.result(uncompressed.runWith(Sink.head), 3.seconds) should be("Hello World")
}
}
}

View file

@ -100,6 +100,16 @@ The :class:`Framing` helper object contains a convenience method to parse messag
.. includecode:: ../code/docs/stream/cookbook/RecipeParseLines.scala#parse-lines .. includecode:: ../code/docs/stream/cookbook/RecipeParseLines.scala#parse-lines
Dealing with compressed data streams
------------------------------------
**Situation:** A gzipped stream of bytes is given as a stream of ``ByteString`` s, for example from a ``FileIO`` source.
The :class:`Compression` helper object contains convenience methods for decompressing data streams compressed with
Gzip or Deflate.
.. includecode:: ../code/docs/stream/cookbook/RecipeDecompress.scala#decompress-gzip
Implementing reduce-by-key Implementing reduce-by-key
-------------------------- --------------------------

View file

@ -0,0 +1,45 @@
/**
* Copyright (C) 2016 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.stream.scaladsl
import java.nio.charset.StandardCharsets
import akka.stream.impl.io.compression.{ DeflateCompressor, GzipCompressor }
import akka.stream.testkit.StreamSpec
import akka.stream.testkit.scaladsl.TestSink
import akka.stream.{ ActorMaterializer, ActorMaterializerSettings }
import akka.util.ByteString
class CompressionSpec extends StreamSpec {
val settings = ActorMaterializerSettings(system)
implicit val materializer = ActorMaterializer(settings)
def gzip(s: String): ByteString = new GzipCompressor().compressAndFinish(ByteString(s))
def deflate(s: String): ByteString = new DeflateCompressor().compressAndFinish(ByteString(s))
val data = "hello world"
"Gzip decompression" must {
"be able to decompress a gzipped stream" in {
Source.single(gzip(data))
.via(Compression.gunzip())
.map(_.decodeString(StandardCharsets.UTF_8))
.runWith(TestSink.probe)
.requestNext(data)
.expectComplete()
}
}
"Deflate decompression" must {
"be able to decompress a deflated stream" in {
Source.single(deflate(data))
.via(Compression.inflate())
.map(_.decodeString(StandardCharsets.UTF_8))
.runWith(TestSink.probe)
.requestNext(data)
.expectComplete()
}
}
}

View file

@ -0,0 +1,44 @@
/**
* Copyright (C) 2009-2016 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.stream.impl.io.compression
import akka.NotUsed
import akka.stream.{ Attributes, FlowShape }
import akka.stream.impl.fusing.GraphStages.SimpleLinearGraphStage
import akka.stream.scaladsl.Flow
import akka.stream.stage.{ GraphStage, GraphStageLogic, InHandler, OutHandler }
import akka.util.ByteString
/** INTERNAL API */
private[stream] object CompressionUtils {
/**
* Creates a flow from a compressor constructor.
*/
def compressorFlow(newCompressor: () Compressor): Flow[ByteString, ByteString, NotUsed] =
Flow.fromGraph {
new SimpleLinearGraphStage[ByteString] {
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with InHandler with OutHandler {
val compressor = newCompressor()
override def onPush(): Unit = {
val data = compressor.compress(grab(in))
if (data.nonEmpty) push(out, data)
else pull(in)
}
override def onPull(): Unit = pull(in)
override def onUpstreamFinish(): Unit = {
val data = compressor.finish()
if (data.nonEmpty) emit(out, data)
completeStage()
}
override def postStop(): Unit = compressor.close()
setHandlers(in, out, this)
}
}
}
}

View file

@ -0,0 +1,40 @@
/**
* Copyright (C) 2009-2016 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.stream.impl.io.compression
import akka.util.ByteString
/**
* INTERNAL API
*
* A stateful object representing ongoing compression.
*/
private[akka] abstract class Compressor {
/**
* Compresses the given input and returns compressed data. The implementation
* can and will choose to buffer output data to improve compression. Use
* `flush` or `compressAndFlush` to make sure that all input data has been
* compressed and pending output data has been returned.
*/
def compress(input: ByteString): ByteString
/**
* Flushes any output data and returns the currently remaining compressed data.
*/
def flush(): ByteString
/**
* Closes this compressed stream and return the remaining compressed data. After
* calling this method, this Compressor cannot be used any further.
*/
def finish(): ByteString
/** Combines `compress` + `flush` */
def compressAndFlush(input: ByteString): ByteString
/** Combines `compress` + `finish` */
def compressAndFinish(input: ByteString): ByteString
/** Make sure any resources have been released */
def close(): Unit
}

View file

@ -0,0 +1,79 @@
/**
* Copyright (C) 2009-2016 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.stream.impl.io.compression
import java.util.zip.Deflater
import akka.util.{ ByteString, ByteStringBuilder }
import scala.annotation.tailrec
/** INTERNAL API */
private[akka] class DeflateCompressor extends Compressor {
import DeflateCompressor._
protected lazy val deflater = new Deflater(Deflater.BEST_COMPRESSION, false)
override final def compressAndFlush(input: ByteString): ByteString = {
val buffer = newTempBuffer(input.size)
compressWithBuffer(input, buffer) ++ flushWithBuffer(buffer)
}
override final def compressAndFinish(input: ByteString): ByteString = {
val buffer = newTempBuffer(input.size)
compressWithBuffer(input, buffer) ++ finishWithBuffer(buffer)
}
override final def compress(input: ByteString): ByteString = compressWithBuffer(input, newTempBuffer())
override final def flush(): ByteString = flushWithBuffer(newTempBuffer())
override final def finish(): ByteString = finishWithBuffer(newTempBuffer())
protected def compressWithBuffer(input: ByteString, buffer: Array[Byte]): ByteString = {
require(deflater.needsInput())
deflater.setInput(input.toArray)
drainDeflater(deflater, buffer)
}
protected def flushWithBuffer(buffer: Array[Byte]): ByteString = {
val written = deflater.deflate(buffer, 0, buffer.length, Deflater.SYNC_FLUSH)
ByteString.fromArray(buffer, 0, written)
}
protected def finishWithBuffer(buffer: Array[Byte]): ByteString = {
deflater.finish()
val res = drainDeflater(deflater, buffer)
deflater.end()
res
}
def close(): Unit = deflater.end()
private def newTempBuffer(size: Int = 65536): Array[Byte] = {
// The default size is somewhat arbitrary, we'd like to guess a better value but Deflater/zlib
// is buffering in an unpredictable manner.
// `compress` will only return any data if the buffered compressed data has some size in
// the region of 10000-50000 bytes.
// `flush` and `finish` will return any size depending on the previous input.
// This value will hopefully provide a good compromise between memory churn and
// excessive fragmentation of ByteStrings.
// We also make sure that buffer size stays within a reasonable range, to avoid
// draining deflator with too small buffer.
new Array[Byte](math.max(size, MinBufferSize))
}
}
/** INTERNAL API */
private[akka] object DeflateCompressor {
val MinBufferSize = 1024
@tailrec
def drainDeflater(deflater: Deflater, buffer: Array[Byte], result: ByteStringBuilder = new ByteStringBuilder()): ByteString = {
val len = deflater.deflate(buffer)
if (len > 0) {
result ++= ByteString.fromArray(buffer, 0, len)
drainDeflater(deflater, buffer, result)
} else {
require(deflater.needsInput())
result.result()
}
}
}

View file

@ -0,0 +1,30 @@
/**
* Copyright (C) 2009-2016 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.stream.impl.io.compression
import java.util.zip.Inflater
import akka.stream.Attributes
import akka.stream.impl.io.ByteStringParser
import akka.stream.impl.io.ByteStringParser.{ ParseResult, ParseStep }
import akka.util.ByteString
/** INTERNAL API */
private[akka] class DeflateDecompressor(maxBytesPerChunk: Int = DeflateDecompressorBase.MaxBytesPerChunkDefault)
extends DeflateDecompressorBase(maxBytesPerChunk) {
override def createLogic(attr: Attributes) = new DecompressorParsingLogic {
override val inflater: Inflater = new Inflater()
override val inflateState = new Inflate(true) {
override def onTruncation(): Unit = completeStage()
}
override def afterInflate = inflateState
override def afterBytesRead(buffer: Array[Byte], offset: Int, length: Int): Unit = {}
startWith(inflateState)
}
}

View file

@ -0,0 +1,50 @@
/**
* Copyright (C) 2009-2016 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.stream.impl.io.compression
import java.util.zip.Inflater
import akka.stream.impl.io.ByteStringParser
import akka.stream.impl.io.ByteStringParser.{ ParseResult, ParseStep }
import akka.util.ByteString
/** INTERNAL API */
private[akka] abstract class DeflateDecompressorBase(maxBytesPerChunk: Int = DeflateDecompressorBase.MaxBytesPerChunkDefault)
extends ByteStringParser[ByteString] {
abstract class DecompressorParsingLogic extends ParsingLogic {
val inflater: Inflater
def afterInflate: ParseStep[ByteString]
def afterBytesRead(buffer: Array[Byte], offset: Int, length: Int): Unit
val inflateState: Inflate
abstract class Inflate(noPostProcessing: Boolean) extends ParseStep[ByteString] {
override def canWorkWithPartialData = true
override def parse(reader: ByteStringParser.ByteReader): ParseResult[ByteString] = {
inflater.setInput(reader.remainingData.toArray)
val buffer = new Array[Byte](maxBytesPerChunk)
val read = inflater.inflate(buffer)
reader.skip(reader.remainingSize - inflater.getRemaining)
if (read > 0) {
afterBytesRead(buffer, 0, read)
val next = if (inflater.finished()) afterInflate else this
ParseResult(Some(ByteString.fromArray(buffer, 0, read)), next, noPostProcessing)
} else {
if (inflater.finished()) ParseResult(None, afterInflate, noPostProcessing)
else throw ByteStringParser.NeedMoreData
}
}
}
override def postStop(): Unit = inflater.end()
}
}
/** INTERNAL API */
private[akka] object DeflateDecompressorBase {
final val MaxBytesPerChunkDefault = 64 * 1024
}

View file

@ -0,0 +1,42 @@
/**
* Copyright (C) 2009-2016 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.stream.impl.io.compression
import java.util.zip.{ CRC32, Deflater }
import akka.util.ByteString
/** INTERNAL API */
private[akka] class GzipCompressor extends DeflateCompressor {
override protected lazy val deflater = new Deflater(Deflater.BEST_COMPRESSION, true)
private val checkSum = new CRC32 // CRC32 of uncompressed data
private var headerSent = false
private var bytesRead = 0L
override protected def compressWithBuffer(input: ByteString, buffer: Array[Byte]): ByteString = {
updateCrc(input)
header() ++ super.compressWithBuffer(input, buffer)
}
override protected def flushWithBuffer(buffer: Array[Byte]): ByteString = header() ++ super.flushWithBuffer(buffer)
override protected def finishWithBuffer(buffer: Array[Byte]): ByteString = header() ++ super.finishWithBuffer(buffer) ++ trailer()
private def updateCrc(input: ByteString): Unit = {
checkSum.update(input.toArray)
bytesRead += input.length
}
private def header(): ByteString =
if (!headerSent) {
headerSent = true
GzipDecompressor.Header
} else ByteString.empty
private def trailer(): ByteString = {
def int32(i: Int): ByteString = ByteString(i, i >> 8, i >> 16, i >> 24)
val crc = checkSum.getValue.toInt
val tot = bytesRead.toInt // truncated to 32bit as specified in https://tools.ietf.org/html/rfc1952#section-2
val trailer = int32(crc) ++ int32(tot)
trailer
}
}

View file

@ -0,0 +1,83 @@
/**
* Copyright (C) 2009-2016 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.stream.impl.io.compression
import java.util.zip.{ CRC32, Inflater, ZipException }
import akka.stream.Attributes
import akka.stream.impl.io.ByteStringParser
import akka.stream.impl.io.ByteStringParser.{ ParseResult, ParseStep }
import akka.util.ByteString
/** INTERNAL API */
private[akka] class GzipDecompressor(maxBytesPerChunk: Int = DeflateDecompressorBase.MaxBytesPerChunkDefault)
extends DeflateDecompressorBase(maxBytesPerChunk) {
override def createLogic(attr: Attributes) = new DecompressorParsingLogic {
override val inflater: Inflater = new Inflater(true)
override def afterInflate: ParseStep[ByteString] = ReadTrailer
override def afterBytesRead(buffer: Array[Byte], offset: Int, length: Int): Unit =
crc32.update(buffer, offset, length)
trait Step extends ParseStep[ByteString] {
override def onTruncation(): Unit = failStage(new ZipException("Truncated GZIP stream"))
}
override val inflateState = new Inflate(false) with Step
startWith(ReadHeaders)
/** Reading the header bytes */
case object ReadHeaders extends Step {
override def parse(reader: ByteStringParser.ByteReader): ParseResult[ByteString] = {
import reader._
if (readByte() != 0x1F || readByte() != 0x8B) fail("Not in GZIP format") // check magic header
if (readByte() != 8) fail("Unsupported GZIP compression method") // check compression method
val flags = readByte()
skip(6) // skip MTIME, XFL and OS fields
if ((flags & 4) > 0) skip(readShortLE()) // skip optional extra fields
if ((flags & 8) > 0) skipZeroTerminatedString() // skip optional file name
if ((flags & 16) > 0) skipZeroTerminatedString() // skip optional file comment
if ((flags & 2) > 0 && crc16(fromStartToHere) != readShortLE()) fail("Corrupt GZIP header")
inflater.reset()
crc32.reset()
ParseResult(None, inflateState, false)
}
}
var crc32: CRC32 = new CRC32
private def fail(msg: String) = throw new ZipException(msg)
/** Reading the trailer */
case object ReadTrailer extends Step {
override def parse(reader: ByteStringParser.ByteReader): ParseResult[ByteString] = {
import reader._
if (readIntLE() != crc32.getValue.toInt) fail("Corrupt data (CRC32 checksum error)")
if (readIntLE() != inflater.getBytesWritten.toInt /* truncated to 32bit */ )
fail("Corrupt GZIP trailer ISIZE")
ParseResult(None, ReadHeaders, true)
}
}
}
private def crc16(data: ByteString) = {
val crc = new CRC32
crc.update(data.toArray)
crc.getValue.toInt & 0xFFFF
}
}
/** INTERNAL API */
private[akka] object GzipDecompressor {
// RFC 1952: http://tools.ietf.org/html/rfc1952 section 2.2
private[impl] val Header = ByteString(
0x1F, // ID1
0x8B, // ID2
8, // CM = Deflate
0, // FLG
0, // MTIME 1
0, // MTIME 2
0, // MTIME 3
0, // MTIME 4
0, // XFL
0 // OS
)
}

View file

@ -0,0 +1,26 @@
/**
* Copyright (C) 2016 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.stream.javadsl
import akka.NotUsed
import akka.stream.scaladsl
import akka.util.ByteString
object Compression {
/**
* Creates a Flow that decompresses gzip-compressed stream of data.
*
* @param maxBytesPerChunk Maximum length of the output [[ByteString]] chunk.
*/
def gunzip(maxBytesPerChunk: Int): Flow[ByteString, ByteString, NotUsed] =
scaladsl.Compression.gunzip(maxBytesPerChunk).asJava
/**
* Creates a Flow that decompresses deflate-compressed stream of data.
*
* @param maxBytesPerChunk Maximum length of the output [[ByteString]] chunk.
*/
def inflate(maxBytesPerChunk: Int): Flow[ByteString, ByteString, NotUsed] =
scaladsl.Compression.inflate(maxBytesPerChunk).asJava
}

View file

@ -0,0 +1,50 @@
/**
* Copyright (C) 2016 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.stream.scaladsl
import akka.NotUsed
import akka.stream.impl.io.compression._
import akka.util.ByteString
object Compression {
/**
* Creates a flow that gzip-compresses a stream of ByteStrings. Note that the compressor
* will SYNC_FLUSH after every [[ByteString]] so that it is guaranteed that every [[ByteString]]
* coming out of the flow can be fully decompressed without waiting for additional data. This may
* come at a compression performance cost for very small chunks.
*
* FIXME: should compression level / strategy / flush mode be configurable? See https://github.com/akka/akka/issues/21849
*/
def gzip: Flow[ByteString, ByteString, NotUsed] =
CompressionUtils.compressorFlow(() new GzipCompressor)
/**
* Creates a Flow that decompresses a gzip-compressed stream of data.
*
* @param maxBytesPerChunk Maximum length of an output [[ByteString]] chunk.
*/
def gunzip(maxBytesPerChunk: Int = DeflateDecompressorBase.MaxBytesPerChunkDefault): Flow[ByteString, ByteString, NotUsed] =
Flow[ByteString].via(new GzipDecompressor(maxBytesPerChunk))
.named("gunzip")
/**
* Creates a flow that deflate-compresses a stream of ByteString. Note that the compressor
* will SYNC_FLUSH after every [[ByteString]] so that it is guaranteed that every [[ByteString]]
* coming out of the flow can be fully decompressed without waiting for additional data. This may
* come at a compression performance cost for very small chunks.
*
* FIXME: should compression level / strategy / flush mode be configurable? See https://github.com/akka/akka/issues/21849
*/
def deflate: Flow[ByteString, ByteString, NotUsed] =
CompressionUtils.compressorFlow(() new DeflateCompressor)
/**
* Creates a Flow that decompresses a deflate-compressed stream of data.
*
* @param maxBytesPerChunk Maximum length of an output [[ByteString]] chunk.
*/
def inflate(maxBytesPerChunk: Int = DeflateDecompressorBase.MaxBytesPerChunkDefault): Flow[ByteString, ByteString, NotUsed] =
Flow[ByteString].via(new DeflateDecompressor(maxBytesPerChunk))
.named("inflate")
}