diff --git a/akka-docs/rst/java/code/docs/stream/javadsl/cookbook/RecipeDecompress.java b/akka-docs/rst/java/code/docs/stream/javadsl/cookbook/RecipeDecompress.java new file mode 100644 index 0000000000..201e1a1d06 --- /dev/null +++ b/akka-docs/rst/java/code/docs/stream/javadsl/cookbook/RecipeDecompress.java @@ -0,0 +1,68 @@ +/** + * Copyright (C) 2016 Lightbend Inc. + */ +package docs.stream.javadsl.cookbook; + +import akka.NotUsed; +import akka.actor.ActorSystem; +import akka.stream.ActorMaterializer; +import akka.stream.Materializer; +import akka.stream.javadsl.Compression; +import akka.stream.javadsl.Sink; +import akka.stream.javadsl.Source; +import akka.testkit.JavaTestKit; +import akka.util.ByteString; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.concurrent.TimeUnit; +import java.util.zip.GZIPOutputStream; + +public class RecipeDecompress extends RecipeTest { + + static ActorSystem system; + static Materializer mat; + + @BeforeClass + public static void setup() { + system = ActorSystem.create("RecipeDecompress"); + mat = ActorMaterializer.create(system); + } + + @AfterClass + public static void tearDown() { + JavaTestKit.shutdownActorSystem(system); + system = null; + mat = null; + } + + private ByteString gzip(final String s) throws IOException { + final ByteArrayOutputStream buf = new ByteArrayOutputStream(); + final GZIPOutputStream out = new GZIPOutputStream(buf); + try { + out.write(s.getBytes(StandardCharsets.UTF_8)); + } finally { + out.close(); + } + return ByteString.fromArray(buf.toByteArray()); + } + + @Test + public void parseLines() throws Exception { + final Source compressed = Source.single(gzip("Hello World")); + + //#decompress-gzip + final Source uncompressed = compressed + .via(Compression.gunzip(100)) + .map(b -> b.utf8String()); + //#decompress-gzip + + uncompressed.runWith(Sink.head(), mat).toCompletableFuture().get(1, TimeUnit.SECONDS); + } + +} diff --git a/akka-docs/rst/java/stream/stream-cookbook.rst b/akka-docs/rst/java/stream/stream-cookbook.rst index 1cc819a294..b44f5d334e 100644 --- a/akka-docs/rst/java/stream/stream-cookbook.rst +++ b/akka-docs/rst/java/stream/stream-cookbook.rst @@ -102,6 +102,17 @@ The :class:`Framing` helper class contains a convenience method to parse message .. includecode:: ../code/docs/stream/javadsl/cookbook/RecipeParseLines.java#parse-lines +Dealing with compressed data streams +------------------------------------ + +**Situation:** A gzipped stream of bytes is given as a stream of ``ByteString`` s, for example from a ``FileIO`` source. + +The :class:`Compression` helper class contains convenience methods for decompressing data streams compressed with +Gzip or Deflate. + +.. includecode:: ../code/docs/stream/javadsl/cookbook/RecipeDecompress.java#decompress-gzip + + Implementing reduce-by-key -------------------------- diff --git a/akka-docs/rst/scala/code/docs/stream/cookbook/RecipeDecompress.scala b/akka-docs/rst/scala/code/docs/stream/cookbook/RecipeDecompress.scala new file mode 100644 index 0000000000..6a7031a8e6 --- /dev/null +++ b/akka-docs/rst/scala/code/docs/stream/cookbook/RecipeDecompress.scala @@ -0,0 +1,40 @@ +/** + * Copyright (C) 2016 Lightbend Inc. + */ +package docs.stream.cookbook + +import java.io.ByteArrayOutputStream +import java.nio.charset.StandardCharsets +import java.util.zip.GZIPOutputStream + +import akka.stream.impl.io.compression.GzipCompressor +import akka.stream.scaladsl.Sink +import akka.stream.scaladsl.Source +import akka.util.ByteString + +import scala.annotation.tailrec +import scala.concurrent.Await +import scala.concurrent.duration._ + +class RecipeDecompress extends RecipeSpec { + def gzip(s: String): ByteString = { + val buf = new ByteArrayOutputStream() + val out = new GZIPOutputStream(buf) + try out.write(s.getBytes(StandardCharsets.UTF_8)) finally out.close() + ByteString(buf.toByteArray) + } + + "Recipe for decompressing a Gzip stream" must { + "work" in { + val compressed = Source.single(gzip("Hello World")) + + //#decompress-gzip + import akka.stream.scaladsl.Compression + val uncompressed = compressed.via(Compression.gunzip()) + .map(_.utf8String) + //#decompress-gzip + + Await.result(uncompressed.runWith(Sink.head), 3.seconds) should be("Hello World") + } + } +} diff --git a/akka-docs/rst/scala/stream/stream-cookbook.rst b/akka-docs/rst/scala/stream/stream-cookbook.rst index 8655b3841b..26eb32bb11 100644 --- a/akka-docs/rst/scala/stream/stream-cookbook.rst +++ b/akka-docs/rst/scala/stream/stream-cookbook.rst @@ -100,6 +100,16 @@ The :class:`Framing` helper object contains a convenience method to parse messag .. includecode:: ../code/docs/stream/cookbook/RecipeParseLines.scala#parse-lines +Dealing with compressed data streams +------------------------------------ + +**Situation:** A gzipped stream of bytes is given as a stream of ``ByteString`` s, for example from a ``FileIO`` source. + +The :class:`Compression` helper object contains convenience methods for decompressing data streams compressed with +Gzip or Deflate. + +.. includecode:: ../code/docs/stream/cookbook/RecipeDecompress.scala#decompress-gzip + Implementing reduce-by-key -------------------------- diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/CompressionSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/CompressionSpec.scala new file mode 100644 index 0000000000..43dbb04dee --- /dev/null +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/CompressionSpec.scala @@ -0,0 +1,45 @@ +/** + * Copyright (C) 2016 Lightbend Inc. + */ +package akka.stream.scaladsl + +import java.nio.charset.StandardCharsets + +import akka.stream.impl.io.compression.{ DeflateCompressor, GzipCompressor } +import akka.stream.testkit.StreamSpec +import akka.stream.testkit.scaladsl.TestSink +import akka.stream.{ ActorMaterializer, ActorMaterializerSettings } +import akka.util.ByteString + +class CompressionSpec extends StreamSpec { + val settings = ActorMaterializerSettings(system) + implicit val materializer = ActorMaterializer(settings) + + def gzip(s: String): ByteString = new GzipCompressor().compressAndFinish(ByteString(s)) + + def deflate(s: String): ByteString = new DeflateCompressor().compressAndFinish(ByteString(s)) + + val data = "hello world" + + "Gzip decompression" must { + "be able to decompress a gzipped stream" in { + Source.single(gzip(data)) + .via(Compression.gunzip()) + .map(_.decodeString(StandardCharsets.UTF_8)) + .runWith(TestSink.probe) + .requestNext(data) + .expectComplete() + } + } + + "Deflate decompression" must { + "be able to decompress a deflated stream" in { + Source.single(deflate(data)) + .via(Compression.inflate()) + .map(_.decodeString(StandardCharsets.UTF_8)) + .runWith(TestSink.probe) + .requestNext(data) + .expectComplete() + } + } +} diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/compression/CompressionUtils.scala b/akka-stream/src/main/scala/akka/stream/impl/io/compression/CompressionUtils.scala new file mode 100644 index 0000000000..6d5b7d923e --- /dev/null +++ b/akka-stream/src/main/scala/akka/stream/impl/io/compression/CompressionUtils.scala @@ -0,0 +1,44 @@ +/** + * Copyright (C) 2009-2016 Lightbend Inc. + */ +package akka.stream.impl.io.compression + +import akka.NotUsed +import akka.stream.{ Attributes, FlowShape } +import akka.stream.impl.fusing.GraphStages.SimpleLinearGraphStage +import akka.stream.scaladsl.Flow +import akka.stream.stage.{ GraphStage, GraphStageLogic, InHandler, OutHandler } +import akka.util.ByteString + +/** INTERNAL API */ +private[stream] object CompressionUtils { + /** + * Creates a flow from a compressor constructor. + */ + def compressorFlow(newCompressor: () ⇒ Compressor): Flow[ByteString, ByteString, NotUsed] = + Flow.fromGraph { + new SimpleLinearGraphStage[ByteString] { + override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with InHandler with OutHandler { + val compressor = newCompressor() + + override def onPush(): Unit = { + val data = compressor.compress(grab(in)) + if (data.nonEmpty) push(out, data) + else pull(in) + } + + override def onPull(): Unit = pull(in) + + override def onUpstreamFinish(): Unit = { + val data = compressor.finish() + if (data.nonEmpty) emit(out, data) + completeStage() + } + + override def postStop(): Unit = compressor.close() + + setHandlers(in, out, this) + } + } + } +} \ No newline at end of file diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/compression/Compressor.scala b/akka-stream/src/main/scala/akka/stream/impl/io/compression/Compressor.scala new file mode 100644 index 0000000000..5e49a17813 --- /dev/null +++ b/akka-stream/src/main/scala/akka/stream/impl/io/compression/Compressor.scala @@ -0,0 +1,40 @@ +/** + * Copyright (C) 2009-2016 Lightbend Inc. + */ +package akka.stream.impl.io.compression + +import akka.util.ByteString + +/** + * INTERNAL API + * + * A stateful object representing ongoing compression. + */ +private[akka] abstract class Compressor { + /** + * Compresses the given input and returns compressed data. The implementation + * can and will choose to buffer output data to improve compression. Use + * `flush` or `compressAndFlush` to make sure that all input data has been + * compressed and pending output data has been returned. + */ + def compress(input: ByteString): ByteString + + /** + * Flushes any output data and returns the currently remaining compressed data. + */ + def flush(): ByteString + + /** + * Closes this compressed stream and return the remaining compressed data. After + * calling this method, this Compressor cannot be used any further. + */ + def finish(): ByteString + + /** Combines `compress` + `flush` */ + def compressAndFlush(input: ByteString): ByteString + /** Combines `compress` + `finish` */ + def compressAndFinish(input: ByteString): ByteString + + /** Make sure any resources have been released */ + def close(): Unit +} diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/compression/DeflateCompressor.scala b/akka-stream/src/main/scala/akka/stream/impl/io/compression/DeflateCompressor.scala new file mode 100644 index 0000000000..8eb4e01270 --- /dev/null +++ b/akka-stream/src/main/scala/akka/stream/impl/io/compression/DeflateCompressor.scala @@ -0,0 +1,79 @@ +/** + * Copyright (C) 2009-2016 Lightbend Inc. + */ +package akka.stream.impl.io.compression + +import java.util.zip.Deflater + +import akka.util.{ ByteString, ByteStringBuilder } + +import scala.annotation.tailrec + +/** INTERNAL API */ +private[akka] class DeflateCompressor extends Compressor { + import DeflateCompressor._ + + protected lazy val deflater = new Deflater(Deflater.BEST_COMPRESSION, false) + + override final def compressAndFlush(input: ByteString): ByteString = { + val buffer = newTempBuffer(input.size) + + compressWithBuffer(input, buffer) ++ flushWithBuffer(buffer) + } + override final def compressAndFinish(input: ByteString): ByteString = { + val buffer = newTempBuffer(input.size) + + compressWithBuffer(input, buffer) ++ finishWithBuffer(buffer) + } + override final def compress(input: ByteString): ByteString = compressWithBuffer(input, newTempBuffer()) + override final def flush(): ByteString = flushWithBuffer(newTempBuffer()) + override final def finish(): ByteString = finishWithBuffer(newTempBuffer()) + + protected def compressWithBuffer(input: ByteString, buffer: Array[Byte]): ByteString = { + require(deflater.needsInput()) + deflater.setInput(input.toArray) + drainDeflater(deflater, buffer) + } + protected def flushWithBuffer(buffer: Array[Byte]): ByteString = { + val written = deflater.deflate(buffer, 0, buffer.length, Deflater.SYNC_FLUSH) + ByteString.fromArray(buffer, 0, written) + } + protected def finishWithBuffer(buffer: Array[Byte]): ByteString = { + deflater.finish() + val res = drainDeflater(deflater, buffer) + deflater.end() + res + } + + def close(): Unit = deflater.end() + + private def newTempBuffer(size: Int = 65536): Array[Byte] = { + // The default size is somewhat arbitrary, we'd like to guess a better value but Deflater/zlib + // is buffering in an unpredictable manner. + // `compress` will only return any data if the buffered compressed data has some size in + // the region of 10000-50000 bytes. + // `flush` and `finish` will return any size depending on the previous input. + // This value will hopefully provide a good compromise between memory churn and + // excessive fragmentation of ByteStrings. + // We also make sure that buffer size stays within a reasonable range, to avoid + // draining deflator with too small buffer. + new Array[Byte](math.max(size, MinBufferSize)) + } +} + +/** INTERNAL API */ +private[akka] object DeflateCompressor { + val MinBufferSize = 1024 + + @tailrec + def drainDeflater(deflater: Deflater, buffer: Array[Byte], result: ByteStringBuilder = new ByteStringBuilder()): ByteString = { + val len = deflater.deflate(buffer) + if (len > 0) { + result ++= ByteString.fromArray(buffer, 0, len) + drainDeflater(deflater, buffer, result) + } else { + require(deflater.needsInput()) + result.result() + } + } +} diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/compression/DeflateDecompressor.scala b/akka-stream/src/main/scala/akka/stream/impl/io/compression/DeflateDecompressor.scala new file mode 100644 index 0000000000..85ab296f61 --- /dev/null +++ b/akka-stream/src/main/scala/akka/stream/impl/io/compression/DeflateDecompressor.scala @@ -0,0 +1,30 @@ +/** + * Copyright (C) 2009-2016 Lightbend Inc. + */ +package akka.stream.impl.io.compression + +import java.util.zip.Inflater + +import akka.stream.Attributes +import akka.stream.impl.io.ByteStringParser +import akka.stream.impl.io.ByteStringParser.{ ParseResult, ParseStep } +import akka.util.ByteString + +/** INTERNAL API */ +private[akka] class DeflateDecompressor(maxBytesPerChunk: Int = DeflateDecompressorBase.MaxBytesPerChunkDefault) + extends DeflateDecompressorBase(maxBytesPerChunk) { + + override def createLogic(attr: Attributes) = new DecompressorParsingLogic { + override val inflater: Inflater = new Inflater() + + override val inflateState = new Inflate(true) { + override def onTruncation(): Unit = completeStage() + } + + override def afterInflate = inflateState + override def afterBytesRead(buffer: Array[Byte], offset: Int, length: Int): Unit = {} + + startWith(inflateState) + } +} + diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/compression/DeflateDecompressorBase.scala b/akka-stream/src/main/scala/akka/stream/impl/io/compression/DeflateDecompressorBase.scala new file mode 100644 index 0000000000..a0f4590e58 --- /dev/null +++ b/akka-stream/src/main/scala/akka/stream/impl/io/compression/DeflateDecompressorBase.scala @@ -0,0 +1,50 @@ +/** + * Copyright (C) 2009-2016 Lightbend Inc. + */ +package akka.stream.impl.io.compression + +import java.util.zip.Inflater + +import akka.stream.impl.io.ByteStringParser +import akka.stream.impl.io.ByteStringParser.{ ParseResult, ParseStep } +import akka.util.ByteString + +/** INTERNAL API */ +private[akka] abstract class DeflateDecompressorBase(maxBytesPerChunk: Int = DeflateDecompressorBase.MaxBytesPerChunkDefault) + extends ByteStringParser[ByteString] { + + abstract class DecompressorParsingLogic extends ParsingLogic { + val inflater: Inflater + def afterInflate: ParseStep[ByteString] + def afterBytesRead(buffer: Array[Byte], offset: Int, length: Int): Unit + val inflateState: Inflate + + abstract class Inflate(noPostProcessing: Boolean) extends ParseStep[ByteString] { + override def canWorkWithPartialData = true + override def parse(reader: ByteStringParser.ByteReader): ParseResult[ByteString] = { + inflater.setInput(reader.remainingData.toArray) + + val buffer = new Array[Byte](maxBytesPerChunk) + val read = inflater.inflate(buffer) + + reader.skip(reader.remainingSize - inflater.getRemaining) + + if (read > 0) { + afterBytesRead(buffer, 0, read) + val next = if (inflater.finished()) afterInflate else this + ParseResult(Some(ByteString.fromArray(buffer, 0, read)), next, noPostProcessing) + } else { + if (inflater.finished()) ParseResult(None, afterInflate, noPostProcessing) + else throw ByteStringParser.NeedMoreData + } + } + } + + override def postStop(): Unit = inflater.end() + } +} + +/** INTERNAL API */ +private[akka] object DeflateDecompressorBase { + final val MaxBytesPerChunkDefault = 64 * 1024 +} diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/compression/GzipCompressor.scala b/akka-stream/src/main/scala/akka/stream/impl/io/compression/GzipCompressor.scala new file mode 100644 index 0000000000..8f9321d23b --- /dev/null +++ b/akka-stream/src/main/scala/akka/stream/impl/io/compression/GzipCompressor.scala @@ -0,0 +1,42 @@ +/** + * Copyright (C) 2009-2016 Lightbend Inc. + */ +package akka.stream.impl.io.compression + +import java.util.zip.{ CRC32, Deflater } + +import akka.util.ByteString + +/** INTERNAL API */ +private[akka] class GzipCompressor extends DeflateCompressor { + override protected lazy val deflater = new Deflater(Deflater.BEST_COMPRESSION, true) + private val checkSum = new CRC32 // CRC32 of uncompressed data + private var headerSent = false + private var bytesRead = 0L + + override protected def compressWithBuffer(input: ByteString, buffer: Array[Byte]): ByteString = { + updateCrc(input) + header() ++ super.compressWithBuffer(input, buffer) + } + override protected def flushWithBuffer(buffer: Array[Byte]): ByteString = header() ++ super.flushWithBuffer(buffer) + override protected def finishWithBuffer(buffer: Array[Byte]): ByteString = header() ++ super.finishWithBuffer(buffer) ++ trailer() + + private def updateCrc(input: ByteString): Unit = { + checkSum.update(input.toArray) + bytesRead += input.length + } + private def header(): ByteString = + if (!headerSent) { + headerSent = true + GzipDecompressor.Header + } else ByteString.empty + + private def trailer(): ByteString = { + def int32(i: Int): ByteString = ByteString(i, i >> 8, i >> 16, i >> 24) + val crc = checkSum.getValue.toInt + val tot = bytesRead.toInt // truncated to 32bit as specified in https://tools.ietf.org/html/rfc1952#section-2 + val trailer = int32(crc) ++ int32(tot) + + trailer + } +} diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/compression/GzipDecompressor.scala b/akka-stream/src/main/scala/akka/stream/impl/io/compression/GzipDecompressor.scala new file mode 100644 index 0000000000..87e2d3dcac --- /dev/null +++ b/akka-stream/src/main/scala/akka/stream/impl/io/compression/GzipDecompressor.scala @@ -0,0 +1,83 @@ +/** + * Copyright (C) 2009-2016 Lightbend Inc. + */ +package akka.stream.impl.io.compression + +import java.util.zip.{ CRC32, Inflater, ZipException } + +import akka.stream.Attributes +import akka.stream.impl.io.ByteStringParser +import akka.stream.impl.io.ByteStringParser.{ ParseResult, ParseStep } +import akka.util.ByteString + +/** INTERNAL API */ +private[akka] class GzipDecompressor(maxBytesPerChunk: Int = DeflateDecompressorBase.MaxBytesPerChunkDefault) + extends DeflateDecompressorBase(maxBytesPerChunk) { + + override def createLogic(attr: Attributes) = new DecompressorParsingLogic { + override val inflater: Inflater = new Inflater(true) + override def afterInflate: ParseStep[ByteString] = ReadTrailer + override def afterBytesRead(buffer: Array[Byte], offset: Int, length: Int): Unit = + crc32.update(buffer, offset, length) + + trait Step extends ParseStep[ByteString] { + override def onTruncation(): Unit = failStage(new ZipException("Truncated GZIP stream")) + } + override val inflateState = new Inflate(false) with Step + startWith(ReadHeaders) + + /** Reading the header bytes */ + case object ReadHeaders extends Step { + override def parse(reader: ByteStringParser.ByteReader): ParseResult[ByteString] = { + import reader._ + if (readByte() != 0x1F || readByte() != 0x8B) fail("Not in GZIP format") // check magic header + if (readByte() != 8) fail("Unsupported GZIP compression method") // check compression method + val flags = readByte() + skip(6) // skip MTIME, XFL and OS fields + if ((flags & 4) > 0) skip(readShortLE()) // skip optional extra fields + if ((flags & 8) > 0) skipZeroTerminatedString() // skip optional file name + if ((flags & 16) > 0) skipZeroTerminatedString() // skip optional file comment + if ((flags & 2) > 0 && crc16(fromStartToHere) != readShortLE()) fail("Corrupt GZIP header") + + inflater.reset() + crc32.reset() + ParseResult(None, inflateState, false) + } + } + var crc32: CRC32 = new CRC32 + private def fail(msg: String) = throw new ZipException(msg) + + /** Reading the trailer */ + case object ReadTrailer extends Step { + override def parse(reader: ByteStringParser.ByteReader): ParseResult[ByteString] = { + import reader._ + if (readIntLE() != crc32.getValue.toInt) fail("Corrupt data (CRC32 checksum error)") + if (readIntLE() != inflater.getBytesWritten.toInt /* truncated to 32bit */ ) + fail("Corrupt GZIP trailer ISIZE") + ParseResult(None, ReadHeaders, true) + } + } + } + private def crc16(data: ByteString) = { + val crc = new CRC32 + crc.update(data.toArray) + crc.getValue.toInt & 0xFFFF + } +} + +/** INTERNAL API */ +private[akka] object GzipDecompressor { + // RFC 1952: http://tools.ietf.org/html/rfc1952 section 2.2 + private[impl] val Header = ByteString( + 0x1F, // ID1 + 0x8B, // ID2 + 8, // CM = Deflate + 0, // FLG + 0, // MTIME 1 + 0, // MTIME 2 + 0, // MTIME 3 + 0, // MTIME 4 + 0, // XFL + 0 // OS + ) +} diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Compression.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Compression.scala new file mode 100644 index 0000000000..d711640b1d --- /dev/null +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Compression.scala @@ -0,0 +1,26 @@ +/** + * Copyright (C) 2016 Lightbend Inc. + */ +package akka.stream.javadsl + +import akka.NotUsed +import akka.stream.scaladsl +import akka.util.ByteString + +object Compression { + /** + * Creates a Flow that decompresses gzip-compressed stream of data. + * + * @param maxBytesPerChunk Maximum length of the output [[ByteString]] chunk. + */ + def gunzip(maxBytesPerChunk: Int): Flow[ByteString, ByteString, NotUsed] = + scaladsl.Compression.gunzip(maxBytesPerChunk).asJava + + /** + * Creates a Flow that decompresses deflate-compressed stream of data. + * + * @param maxBytesPerChunk Maximum length of the output [[ByteString]] chunk. + */ + def inflate(maxBytesPerChunk: Int): Flow[ByteString, ByteString, NotUsed] = + scaladsl.Compression.inflate(maxBytesPerChunk).asJava +} diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Compression.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Compression.scala new file mode 100644 index 0000000000..e7e77821cd --- /dev/null +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Compression.scala @@ -0,0 +1,50 @@ +/** + * Copyright (C) 2016 Lightbend Inc. + */ +package akka.stream.scaladsl + +import akka.NotUsed +import akka.stream.impl.io.compression._ +import akka.util.ByteString + +object Compression { + /** + * Creates a flow that gzip-compresses a stream of ByteStrings. Note that the compressor + * will SYNC_FLUSH after every [[ByteString]] so that it is guaranteed that every [[ByteString]] + * coming out of the flow can be fully decompressed without waiting for additional data. This may + * come at a compression performance cost for very small chunks. + * + * FIXME: should compression level / strategy / flush mode be configurable? See https://github.com/akka/akka/issues/21849 + */ + def gzip: Flow[ByteString, ByteString, NotUsed] = + CompressionUtils.compressorFlow(() ⇒ new GzipCompressor) + + /** + * Creates a Flow that decompresses a gzip-compressed stream of data. + * + * @param maxBytesPerChunk Maximum length of an output [[ByteString]] chunk. + */ + def gunzip(maxBytesPerChunk: Int = DeflateDecompressorBase.MaxBytesPerChunkDefault): Flow[ByteString, ByteString, NotUsed] = + Flow[ByteString].via(new GzipDecompressor(maxBytesPerChunk)) + .named("gunzip") + + /** + * Creates a flow that deflate-compresses a stream of ByteString. Note that the compressor + * will SYNC_FLUSH after every [[ByteString]] so that it is guaranteed that every [[ByteString]] + * coming out of the flow can be fully decompressed without waiting for additional data. This may + * come at a compression performance cost for very small chunks. + * + * FIXME: should compression level / strategy / flush mode be configurable? See https://github.com/akka/akka/issues/21849 + */ + def deflate: Flow[ByteString, ByteString, NotUsed] = + CompressionUtils.compressorFlow(() ⇒ new DeflateCompressor) + + /** + * Creates a Flow that decompresses a deflate-compressed stream of data. + * + * @param maxBytesPerChunk Maximum length of an output [[ByteString]] chunk. + */ + def inflate(maxBytesPerChunk: Int = DeflateDecompressorBase.MaxBytesPerChunkDefault): Flow[ByteString, ByteString, NotUsed] = + Flow[ByteString].via(new DeflateDecompressor(maxBytesPerChunk)) + .named("inflate") +}