diff --git a/akka-docs/src/main/categories/compression-operators.md b/akka-docs/src/main/categories/compression-operators.md new file mode 100644 index 0000000000..e3ed3f74c6 --- /dev/null +++ b/akka-docs/src/main/categories/compression-operators.md @@ -0,0 +1 @@ +Flow operators to (de)compress. \ No newline at end of file diff --git a/akka-docs/src/main/paradox/stream/operators/Compression/deflate.md b/akka-docs/src/main/paradox/stream/operators/Compression/deflate.md new file mode 100644 index 0000000000..d98c69355c --- /dev/null +++ b/akka-docs/src/main/paradox/stream/operators/Compression/deflate.md @@ -0,0 +1,30 @@ +# Compression.deflate + +Creates a flow that deflate-compresses a stream of ByteStrings. + +@ref[Compression operators](../index.md#compression-operators) + +## Signature + +@apidoc[Compression.deflate](stream.*.Compression$) { scala="#deflate:akka.stream.scaladsl.Flow[akka.util.ByteString,akka.util.ByteString,akka.NotUsed]" java="#deflate()" } + +## Description + +Creates a flow that deflate-compresses a stream of ByteStrings. Note that the compressor +will SYNC_FLUSH after every @apidoc[akka.util.ByteString] so that it is guaranteed that every @apidoc[akka.util.ByteString] +coming out of the flow can be fully decompressed without waiting for additional data. This may +come at a compression performance cost for very small chunks. + +Use the overload method with parameters to control the compression level and compatibility with GZip. + +## Reactive Streams semantics + +@@@div { .callout } + +**emits** when the compression algorithm produces output for the received `ByteString` + +**backpressures** when downstream backpressures + +**completes** when upstream completes (may emit finishing bytes in an extra `ByteString` ) + +@@@ diff --git a/akka-docs/src/main/paradox/stream/operators/Compression/gzip.md b/akka-docs/src/main/paradox/stream/operators/Compression/gzip.md new file mode 100644 index 0000000000..a938a8bf84 --- /dev/null +++ b/akka-docs/src/main/paradox/stream/operators/Compression/gzip.md @@ -0,0 +1,30 @@ +# Compression.gzip + +Creates a flow that gzip-compresses a stream of ByteStrings. + +@ref[Compression operators](../index.md#compression-operators) + +## Signature + +@apidoc[Compression.gzip](stream.*.Compression$) { scala="#gzip:akka.stream.scaladsl.Flow[akka.util.ByteString,akka.util.ByteString,akka.NotUsed]" java="#gzip()" } + +## Description + +Creates a flow that gzip-compresses a stream of ByteStrings. Note that the compressor +will SYNC_FLUSH after every @apidoc[akka.util.ByteString] so that it is guaranteed that every @apidoc[akka.util.ByteString] +coming out of the flow can be fully decompressed without waiting for additional data. This may +come at a compression performance cost for very small chunks. + +Use the overload method to control the compression level. + +## Reactive Streams semantics + +@@@div { .callout } + +**emits** when the compression algorithm produces output for the received `ByteString` + +**backpressures** when downstream backpressures + +**completes** when upstream completes + +@@@ diff --git a/akka-docs/src/main/paradox/stream/operators/index.md b/akka-docs/src/main/paradox/stream/operators/index.md index 6175ada555..784d2ad61d 100644 --- a/akka-docs/src/main/paradox/stream/operators/index.md +++ b/akka-docs/src/main/paradox/stream/operators/index.md @@ -323,6 +323,15 @@ Operators meant for inter-operating between Akka Streams and Actors: |ActorSink|@ref[actorRefWithBackpressure](ActorSink/actorRefWithBackpressure.md)|Sends the elements of the stream to the given @java[`ActorRef`]@scala[`ActorRef[T]`] with backpressure, to be able to signal demand when the actor is ready to receive more elements.| |ActorFlow|@ref[ask](ActorFlow/ask.md)|Use the `AskPattern` to send each element as an `ask` to the target actor, and expect a reply back that will be sent further downstream.| +## Compression operators + +Flow operators to (de)compress. + +| |Operator|Description| +|--|--|--| +|Compression|@ref[deflate](Compression/deflate.md)|Creates a flow that deflate-compresses a stream of ByteStrings. | +|Compression|@ref[gzip](Compression/gzip.md)|Creates a flow that gzip-compresses a stream of ByteStrings. | + ## Error handling For more background see the @ref[Error Handling in Streams](../stream-error.md) section. @@ -516,6 +525,8 @@ For more background see the @ref[Error Handling in Streams](../stream-error.md) * [withBackoff](RestartSink/withBackoff.md) * [withBackoff](RetryFlow/withBackoff.md) * [withBackoffAndContext](RetryFlow/withBackoffAndContext.md) +* [gzip](Compression/gzip.md) +* [deflate](Compression/deflate.md) * [actorRef](ActorSource/actorRef.md) * [actorRefWithBackpressure](ActorSource/actorRefWithBackpressure.md) * [ask](ActorFlow/ask.md) diff --git a/project/StreamOperatorsIndexGenerator.scala b/project/StreamOperatorsIndexGenerator.scala index d3f14b8452..243e471c40 100644 --- a/project/StreamOperatorsIndexGenerator.scala +++ b/project/StreamOperatorsIndexGenerator.scala @@ -33,6 +33,7 @@ object StreamOperatorsIndexGenerator extends AutoPlugin { "Fan-out operators", "Watching status operators", "Actor interop operators", + "Compression operators", "Error handling") def categoryId(name: String): String = name.toLowerCase.replace(' ', '-') @@ -98,7 +99,12 @@ object StreamOperatorsIndexGenerator extends AutoPlugin { "actorSubscriber", "foldAsync", "newOnCompleteStage", + ), + "Compression" -> Seq( + "inflate", + "gunzip", ) + ) val ignore = @@ -151,6 +157,8 @@ object StreamOperatorsIndexGenerator extends AutoPlugin { "akka-stream/src/main/scala/akka/stream/javadsl/RestartSink.scala", "akka-stream/src/main/scala/akka/stream/scaladsl/RetryFlow.scala", "akka-stream/src/main/scala/akka/stream/javadsl/RetryFlow.scala", + "akka-stream/src/main/scala/akka/stream/scaladsl/Compression.scala", + "akka-stream/src/main/scala/akka/stream/javadsl/Compression.scala", // akka-stream-typed "akka-stream-typed/src/main/scala/akka/stream/typed/javadsl/ActorSource.scala", "akka-stream-typed/src/main/scala/akka/stream/typed/scaladsl/ActorSource.scala",