diff --git a/akka-http-core/src/main/scala/akka/http/model/HttpEntity.scala b/akka-http-core/src/main/scala/akka/http/model/HttpEntity.scala index 1d7ce9086c..9736e4b038 100644 --- a/akka-http-core/src/main/scala/akka/http/model/HttpEntity.scala +++ b/akka-http-core/src/main/scala/akka/http/model/HttpEntity.scala @@ -14,11 +14,14 @@ import scala.concurrent.{ Future, ExecutionContext } import scala.concurrent.duration.FiniteDuration import scala.collection.immutable import akka.util.ByteString -import akka.stream.{ TimerTransformer, FlowMaterializer } +import akka.stream.{ Transformer, TimerTransformer, FlowMaterializer } import akka.stream.scaladsl.Flow import akka.stream.impl.{ EmptyPublisher, SynchronousPublisherFromIterable } +import akka.http.util._ import japi.JavaMapping.Implicits._ +import scala.util.control.NonFatal + /** * Models the entity (aka "body" or "content) of an HTTP message. */ @@ -63,6 +66,16 @@ sealed trait HttpEntity extends japi.HttpEntity { Flow(dataBytes).timerTransform("toStrict", transformer).toFuture() } + /** + * Returns a copy of the given entity with the ByteString chunks of this entity transformed by the given transformer. + * For a `Chunked` entity, the chunks will be transformed one by one keeping the chunk metadata (but may introduce an + * extra chunk before the `LastChunk` if `transformer.onTermination` returns additional data). + * + * This method may only throw an exception if the `transformer` function throws an exception while creating the transformer. + * Any other errors are reported through the new entity data stream. + */ + def transformDataBytes(transformer: () ⇒ Transformer[ByteString, ByteString])(implicit materializer: FlowMaterializer): HttpEntity + /** * Creates a copy of this HttpEntity with the `contentType` overridden with the given one. */ @@ -85,10 +98,14 @@ sealed trait BodyPartEntity extends HttpEntity with japi.BodyPartEntity { /* An entity that can be used for requests */ sealed trait RequestEntity extends HttpEntity with japi.RequestEntity with ResponseEntity { def withContentType(contentType: ContentType): RequestEntity + + override def transformDataBytes(transformer: () ⇒ Transformer[ByteString, ByteString])(implicit materializer: FlowMaterializer): RequestEntity } /* An entity that can be used for responses */ sealed trait ResponseEntity extends HttpEntity with japi.ResponseEntity { def withContentType(contentType: ContentType): ResponseEntity + + override def transformDataBytes(transformer: () ⇒ Transformer[ByteString, ByteString])(implicit materializer: FlowMaterializer): ResponseEntity } /* An entity that can be used for requests, responses, and body parts */ sealed trait UniversalEntity extends japi.UniversalEntity with MessageEntity with BodyPartEntity { @@ -136,6 +153,17 @@ object HttpEntity { override def toStrict(timeout: FiniteDuration)(implicit ec: ExecutionContext, fm: FlowMaterializer) = FastFuture.successful(this) + override def transformDataBytes(transformer: () ⇒ Transformer[ByteString, ByteString])(implicit materializer: FlowMaterializer): MessageEntity = { + try { + val t = transformer() + val newData = (t.onNext(data) ++ t.onTermination(None)).join + copy(data = newData) + } catch { + case NonFatal(ex) ⇒ + Chunked(contentType, StreamUtils.failedPublisher(ex)) + } + } + def withContentType(contentType: ContentType): Strict = if (contentType == this.contentType) this else copy(contentType = contentType) } @@ -154,6 +182,12 @@ object HttpEntity { def dataBytes(implicit fm: FlowMaterializer): Publisher[ByteString] = data + override def transformDataBytes(transformer: () ⇒ Transformer[ByteString, ByteString])(implicit materializer: FlowMaterializer): Chunked = { + val chunks = Flow(data).transform("transformDataBytes-Default", () ⇒ transformer().map(Chunk(_): ChunkStreamPart)).toPublisher() + + HttpEntity.Chunked(contentType, chunks) + } + def withContentType(contentType: ContentType): Default = if (contentType == this.contentType) this else copy(contentType = contentType) } @@ -184,6 +218,10 @@ object HttpEntity { override def isCloseDelimited: Boolean = true def withContentType(contentType: ContentType): CloseDelimited = if (contentType == this.contentType) this else copy(contentType = contentType) + + override def transformDataBytes(transformer: () ⇒ Transformer[ByteString, ByteString])(implicit materializer: FlowMaterializer): CloseDelimited = + HttpEntity.CloseDelimited(contentType, + Flow(data).transform("transformDataBytes-CloseDelimited", transformer).toPublisher()) } /** @@ -196,6 +234,10 @@ object HttpEntity { override def isIndefiniteLength: Boolean = true def withContentType(contentType: ContentType): IndefiniteLength = if (contentType == this.contentType) this else copy(contentType = contentType) + + override def transformDataBytes(transformer: () ⇒ Transformer[ByteString, ByteString])(implicit materializer: FlowMaterializer): IndefiniteLength = + HttpEntity.IndefiniteLength(contentType, + Flow(data).transform("transformDataBytes-IndefiniteLength", transformer).toPublisher()) } /** @@ -210,6 +252,35 @@ object HttpEntity { def dataBytes(implicit fm: FlowMaterializer): Publisher[ByteString] = Flow(chunks).map(_.data).filter(_.nonEmpty).toPublisher() + override def transformDataBytes(transformer: () ⇒ Transformer[ByteString, ByteString])(implicit materializer: FlowMaterializer): Chunked = { + val newChunks = + Flow(chunks).transform("transformDataBytes-Chunked", () ⇒ new Transformer[ChunkStreamPart, ChunkStreamPart] { + val byteTransformer = transformer() + var sentLastChunk = false + + override def isComplete: Boolean = byteTransformer.isComplete + + def onNext(element: ChunkStreamPart): immutable.Seq[ChunkStreamPart] = element match { + case Chunk(data, ext) ⇒ Chunk(byteTransformer.onNext(data).join, ext) :: Nil + case l: LastChunk ⇒ + sentLastChunk = true + Chunk(byteTransformer.onTermination(None).join) :: l :: Nil + } + + override def onTermination(e: Option[Throwable]): immutable.Seq[ChunkStreamPart] = { + val remaining = + if (e.isEmpty && !sentLastChunk) byteTransformer.onTermination(None) + else if (e.isDefined /* && sentLastChunk */ ) byteTransformer.onTermination(e) + else Nil + + if (remaining.nonEmpty) Chunk(remaining.join) :: Nil + else Nil + } + }).toPublisher() + + HttpEntity.Chunked(contentType, newChunks) + } + def withContentType(contentType: ContentType): Chunked = if (contentType == this.contentType) this else copy(contentType = contentType) diff --git a/akka-http-core/src/main/scala/akka/http/model/HttpMessage.scala b/akka-http-core/src/main/scala/akka/http/model/HttpMessage.scala index 454e9d7b9e..cd9d9b0384 100644 --- a/akka-http-core/src/main/scala/akka/http/model/HttpMessage.scala +++ b/akka-http-core/src/main/scala/akka/http/model/HttpMessage.scala @@ -21,6 +21,7 @@ import FastFuture._ */ sealed trait HttpMessage extends japi.HttpMessage { type Self <: HttpMessage + def self: Self def isRequest: Boolean def isResponse: Boolean @@ -134,6 +135,7 @@ final case class HttpRequest(method: HttpMethod = HttpMethods.GET, "HTTP/1.0 requests must not have a chunked entity") type Self = HttpRequest + def self = this override def isRequest = true override def isResponse = false @@ -288,6 +290,7 @@ final case class HttpResponse(status: StatusCode = StatusCodes.OK, entity: ResponseEntity = HttpEntity.Empty, protocol: HttpProtocol = HttpProtocols.`HTTP/1.1`) extends japi.HttpResponse with HttpMessage { type Self = HttpResponse + def self = this override def isRequest = false override def isResponse = true diff --git a/akka-http-core/src/main/scala/akka/http/util/EnhancedByteStringTraversableOnce.scala b/akka-http-core/src/main/scala/akka/http/util/EnhancedByteStringTraversableOnce.scala new file mode 100644 index 0000000000..c7529e578f --- /dev/null +++ b/akka-http-core/src/main/scala/akka/http/util/EnhancedByteStringTraversableOnce.scala @@ -0,0 +1,14 @@ +/* + * Copyright (C) 2009-2014 Typesafe Inc. + */ + +package akka.http.util + +import akka.util.ByteString + +/** + * INTERNAL API + */ +private[http] class EnhancedByteStringTraversableOnce(val byteStrings: TraversableOnce[ByteString]) extends AnyVal { + def join: ByteString = byteStrings.foldLeft(ByteString.empty)(_ ++ _) +} diff --git a/akka-http-core/src/main/scala/akka/http/util/StreamUtils.scala b/akka-http-core/src/main/scala/akka/http/util/StreamUtils.scala new file mode 100644 index 0000000000..5a3ea605bc --- /dev/null +++ b/akka-http-core/src/main/scala/akka/http/util/StreamUtils.scala @@ -0,0 +1,54 @@ +/* + * Copyright (C) 2009-2014 Typesafe Inc. + */ + +package akka.http.util + +import akka.stream.impl.ErrorPublisher +import akka.stream.scaladsl.Flow +import akka.stream.{ FlowMaterializer, Transformer } +import akka.util.ByteString +import org.reactivestreams.Publisher + +import scala.collection.immutable + +/** + * INTERNAL API + */ +private[http] object StreamUtils { + /** + * Maps a transformer by strictly applying the given function to each output element. + */ + def mapTransformer[T, U, V](t: Transformer[T, U], f: U ⇒ V): Transformer[T, V] = + new Transformer[T, V] { + override def isComplete: Boolean = t.isComplete + + def onNext(element: T): immutable.Seq[V] = t.onNext(element).map(f) + override def onTermination(e: Option[Throwable]): immutable.Seq[V] = t.onTermination(e).map(f) + override def onError(cause: Throwable): Unit = t.onError(cause) + override def cleanup(): Unit = t.cleanup() + } + + /** + * Creates a transformer that will call `f` for each incoming ByteString and output its result. After the complete + * input has been read it will call `finish` once to determine the final ByteString to post to the output. + */ + def byteStringTransformer(f: ByteString ⇒ ByteString, finish: () ⇒ ByteString): Transformer[ByteString, ByteString] = + new Transformer[ByteString, ByteString] { + def onNext(element: ByteString): immutable.Seq[ByteString] = f(element) :: Nil + + override def onTermination(e: Option[Throwable]): immutable.Seq[ByteString] = + if (e.isEmpty) { + val last = finish() + if (last.nonEmpty) last :: Nil + else Nil + } else super.onTermination(e) + } + + def failedPublisher[T](ex: Throwable): Publisher[T] = + ErrorPublisher(ex).asInstanceOf[Publisher[T]] +} + +private[http] class EnhancedTransformer[T, U](val t: Transformer[T, U]) extends AnyVal { + def map[V](f: U ⇒ V): Transformer[T, V] = StreamUtils.mapTransformer(t, f) +} \ No newline at end of file diff --git a/akka-http-core/src/main/scala/akka/http/util/package.scala b/akka-http-core/src/main/scala/akka/http/util/package.scala index 90bd35c5e3..31c1498219 100644 --- a/akka-http-core/src/main/scala/akka/http/util/package.scala +++ b/akka-http-core/src/main/scala/akka/http/util/package.scala @@ -30,6 +30,10 @@ package object util { private[http] implicit def enhanceConfig(config: Config): EnhancedConfig = new EnhancedConfig(config) private[http] implicit def enhanceString_(s: String): EnhancedString = new EnhancedString(s) private[http] implicit def enhanceRegex(regex: Regex): EnhancedRegex = new EnhancedRegex(regex) + private[http] implicit def enhanceByteStrings(byteStrings: TraversableOnce[ByteString]): EnhancedByteStringTraversableOnce = + new EnhancedByteStringTraversableOnce(byteStrings) + private[http] implicit def enhanceTransformer[T, U](transformer: Transformer[T, U]): EnhancedTransformer[T, U] = + new EnhancedTransformer(transformer) private[http] implicit class FlowWithHeadAndTail[T](val underlying: Flow[Publisher[T]]) extends AnyVal { def headAndTail(implicit fm: FlowMaterializer): Flow[(T, Publisher[T])] = diff --git a/akka-http-core/src/test/scala/akka/http/model/HttpEntitySpec.scala b/akka-http-core/src/test/scala/akka/http/model/HttpEntitySpec.scala index c1a711a0dd..9a09e949ff 100644 --- a/akka-http-core/src/test/scala/akka/http/model/HttpEntitySpec.scala +++ b/akka-http-core/src/test/scala/akka/http/model/HttpEntitySpec.scala @@ -7,6 +7,7 @@ package akka.http.model import java.util.concurrent.TimeoutException import com.typesafe.config.{ ConfigFactory, Config } import org.reactivestreams.Publisher +import scala.collection.immutable import scala.concurrent.{ Promise, Await } import scala.concurrent.duration._ import org.scalatest.{ BeforeAndAfterAll, MustMatchers, FreeSpec } @@ -14,10 +15,9 @@ import org.scalatest.matchers.Matcher import akka.util.ByteString import akka.actor.ActorSystem import akka.stream.scaladsl.Flow -import akka.stream.FlowMaterializer +import akka.stream.{ Transformer, FlowMaterializer } import akka.stream.impl.SynchronousPublisherFromIterable import akka.http.model.HttpEntity._ -import akka.http.util.FastFuture._ class HttpEntitySpec extends FreeSpec with MustMatchers with BeforeAndAfterAll { val tpe: ContentType = ContentTypes.`application/octet-stream` @@ -81,6 +81,27 @@ class HttpEntitySpec extends FreeSpec with MustMatchers with BeforeAndAfterAll { }.getMessage must be("HttpEntity.toStrict timed out after 100 milliseconds while still waiting for outstanding data") } } + "support transformDataBytes" - { + "Strict" in { + Strict(tpe, abc) must transformTo(Strict(tpe, doubleChars("abc") ++ trailer)) + } + "Default" in { + Default(tpe, 11, publisher(abc, de, fgh, ijk)) must + transformTo(Strict(tpe, doubleChars("abcdefghijk") ++ trailer)) + } + "CloseDelimited" in { + CloseDelimited(tpe, publisher(abc, de, fgh, ijk)) must + transformTo(Strict(tpe, doubleChars("abcdefghijk") ++ trailer)) + } + "Chunked w/o LastChunk" in { + Chunked(tpe, publisher(Chunk(abc), Chunk(fgh), Chunk(ijk))) must + transformTo(Strict(tpe, doubleChars("abcfghijk") ++ trailer)) + } + "Chunked with LastChunk" in { + Chunked(tpe, publisher(Chunk(abc), Chunk(fgh), Chunk(ijk), LastChunk)) must + transformTo(Strict(tpe, doubleChars("abcfghijk") ++ trailer)) + } + } } def publisher[T](elems: T*) = SynchronousPublisherFromIterable(elems.toList) @@ -93,4 +114,23 @@ class HttpEntitySpec extends FreeSpec with MustMatchers with BeforeAndAfterAll { def strictifyTo(strict: Strict): Matcher[HttpEntity] = equal(strict).matcher[Strict].compose(x ⇒ Await.result(x.toStrict(250.millis), 250.millis)) + + def transformTo(strict: Strict): Matcher[HttpEntity] = + equal(strict).matcher[Strict].compose { x ⇒ + val transformed = x.transformDataBytes(duplicateBytesTransformer) + Await.result(transformed.toStrict(250.millis), 250.millis) + } + + def duplicateBytesTransformer(): Transformer[ByteString, ByteString] = + new Transformer[ByteString, ByteString] { + def onNext(bs: ByteString): immutable.Seq[ByteString] = + Vector(doubleChars(bs)) + + override def onTermination(e: Option[Throwable]): immutable.Seq[ByteString] = + Vector(trailer) + } + + def trailer: ByteString = ByteString("--dup") + def doubleChars(bs: ByteString): ByteString = ByteString(bs.flatMap(b ⇒ Seq(b, b)): _*) + def doubleChars(str: String): ByteString = doubleChars(ByteString(str)) } diff --git a/akka-http-tests/src/test/scala/akka/http/coding/CodecSpecSupport.scala b/akka-http-tests/src/test/scala/akka/http/coding/CodecSpecSupport.scala new file mode 100644 index 0000000000..ee2c9e04dc --- /dev/null +++ b/akka-http-tests/src/test/scala/akka/http/coding/CodecSpecSupport.scala @@ -0,0 +1,79 @@ +/* + * Copyright (C) 2009-2014 Typesafe Inc. + */ + +package akka.http.coding + +import org.scalatest.{ Suite, BeforeAndAfterAll, Matchers } + +import scala.concurrent.duration._ + +import akka.actor.ActorSystem +import akka.stream.FlowMaterializer +import akka.util.ByteString + +trait CodecSpecSupport extends Matchers with BeforeAndAfterAll { self: Suite ⇒ + + def readAs(string: String, charset: String = "UTF8") = equal(string).matcher[String] compose { (_: ByteString).decodeString(charset) } + def hexDump(bytes: ByteString) = bytes.map("%02x".format(_)).mkString + def fromHexDump(dump: String) = dump.grouped(2).toArray.map(chars ⇒ Integer.parseInt(new String(chars), 16).toByte) + + def printBytes(i: Int, id: String) = { + def byte(i: Int) = (i & 0xFF).toHexString + println(id + ": " + byte(i) + ":" + byte(i >> 8) + ":" + byte(i >> 16) + ":" + byte(i >> 24)) + i + } + + lazy val smallTextBytes = ByteString(smallText, "UTF8") + lazy val largeTextBytes = ByteString(largeText, "UTF8") + + val smallText = "Yeah!" + val largeText = + """Lorem ipsum dolor sit amet, consetetur sadipscing elitr, sed diam nonumy eirmod tempor invidunt ut labore et dolore +magna aliquyam erat, sed diam voluptua. At vero eos et accusam et justo duo dolores et ea rebum. Stet clita kasd +gubergren, no sea takimata sanctus est Lorem ipsum dolor sit amet. Lorem ipsum dolor sit amet, consetetur sadipscing +elitr, sed diam nonumy eirmod tempor invidunt ut labore et dolore magna aliquyam erat, sed diam voluptua. At vero eos +et accusam et justo duo dolores et ea rebum. Stet clita kasd gubergren, no sea takimata sanctus est Lorem ipsum dolor +sit amet. Lorem ipsum dolor sit amet, consetetur sadipscing elitr, sed diam nonumy eirmod tempor invidunt ut labore et +dolore magna aliquyam erat, sed diam voluptua. At vero eos et accusam et justo duo dolores et ea rebum. Stet clita kasd +gubergren, no sea takimata sanctus est Lorem ipsum dolor sit amet. + +Duis autem vel eum iriure dolor in hendrerit in vulputate velit esse molestie consequat, vel illum dolore eu feugiat +nulla facilisis at vero eros et accumsan et iusto odio dignissim qui blandit praesent luptatum zzril delenit augue duis +dolore te feugait nulla facilisi. Lorem ipsum dolor sit amet, consectetuer adipiscing elit, sed diam nonummy nibh +euismod tincidunt ut laoreet dolore magna aliquam erat volutpat. + +Ut wisi enim ad minim veniam, quis nostrud exerci tation ullamcorper suscipit lobortis nisl ut aliquip ex ea commodo +consequat. Duis autem vel eum iriure dolor in hendrerit in vulputate velit esse molestie consequat, vel illum dolore eu +feugiat nulla facilisis at vero eros et accumsan et iusto odio dignissim qui blandit praesent luptatum zzril delenit +augue duis dolore te feugait nulla facilisi. + +Nam liber tempor cum soluta nobis eleifend option congue nihil imperdiet doming id quod mazim placerat facer possim +assum. Lorem ipsum dolor sit amet, consectetuer adipiscing elit, sed diam nonummy nibh euismod tincidunt ut laoreet +dolore magna aliquam erat volutpat. Ut wisi enim ad minim veniam, quis nostrud exerci tation ullamcorper suscipit +lobortis nisl ut aliquip ex ea commodo consequat. + +Duis autem vel eum iriure dolor in hendrerit in vulputate velit esse molestie consequat, vel illum dolore eu feugiat +nulla facilisis. + +At vero eos et accusam et justo duo dolores et ea rebum. Stet clita kasd gubergren, no sea takimata sanctus est Lorem +ipsum dolor sit amet. Lorem ipsum dolor sit amet, consetetur sadipscing elitr, sed diam nonumy eirmod tempor invidunt +ut labore et dolore magna aliquyam erat, sed diam voluptua. At vero eos et accusam et justo duo dolores et ea rebum. +Stet clita kasd gubergren, no sea takimata sanctus est Lorem ipsum dolor sit amet. Lorem ipsum dolor sit amet, +consetetur sadipscing elitr, At accusam aliquyam diam diam dolore dolores duo eirmod eos erat, et nonumy sed tempor et +et invidunt justo labore Stet clita ea et gubergren, kasd magna no rebum. sanctus sea sed takimata ut vero voluptua. +est Lorem ipsum dolor sit amet. Lorem ipsum dolor sit amet, consetetur sadipscing elitr, sed diam nonumy eirmod tempor +invidunt ut labore et dolore magna aliquyam erat. + +Consetetur sadipscing elitr, sed diam nonumy eirmod tempor invidunt ut labore et dolore magna aliquyam erat, sed diam +voluptua. At vero eos et accusam et justo duo dolores et ea rebum. Stet clita kasd gubergren, no sea takimata sanctus +est Lorem ipsum dolor sit amet. Lorem ipsum dolor sit amet, consetetur sadipscing elitr, sed diam nonumy e""".replace("\r\n", "\n") + + implicit val system = ActorSystem(getClass.getSimpleName) + implicit val materializer = FlowMaterializer() + + override def afterAll() = { + system.shutdown() + system.awaitTermination(10.seconds) + } +} \ No newline at end of file diff --git a/akka-http-tests/src/test/scala/akka/http/coding/DecoderSpec.scala b/akka-http-tests/src/test/scala/akka/http/coding/DecoderSpec.scala new file mode 100644 index 0000000000..000685eb92 --- /dev/null +++ b/akka-http-tests/src/test/scala/akka/http/coding/DecoderSpec.scala @@ -0,0 +1,39 @@ +/* + * Copyright (C) 2009-2014 Typesafe Inc. + */ + +package akka.http.coding + +import akka.util.ByteString +import org.scalatest.WordSpec +import akka.http.model._ +import headers._ +import HttpMethods.POST + +class DecoderSpec extends WordSpec with CodecSpecSupport { + + "A Decoder" should { + "not transform the message if it doesn't contain a Content-Encoding header" in { + val request = HttpRequest(POST, entity = HttpEntity(smallText)) + DummyDecoder.decode(request) === request + } + "correctly transform the message if it contains a Content-Encoding header" in { + val request = HttpRequest(POST, entity = HttpEntity(smallText), headers = List(`Content-Encoding`(DummyDecoder.encoding))) + val decoded = DummyDecoder.decode(request) + decoded.headers === Nil + decoded.entity === HttpEntity(dummyDecompress(smallText)) + } + } + + def dummyDecompress(s: String): String = dummyDecompress(ByteString(s, "UTF8")).decodeString("UTF8") + def dummyDecompress(bytes: ByteString): ByteString = DummyDecompressor.decompress(bytes) + + case object DummyDecoder extends Decoder { + val encoding = HttpEncodings.compress + def newDecompressor = DummyDecompressor + } + + case object DummyDecompressor extends Decompressor { + def decompress(buffer: ByteString): ByteString = buffer ++ ByteString("compressed") + } +} diff --git a/akka-http-tests/src/test/scala/akka/http/coding/DeflateSpec.scala b/akka-http-tests/src/test/scala/akka/http/coding/DeflateSpec.scala new file mode 100644 index 0000000000..3ee58e8bdb --- /dev/null +++ b/akka-http-tests/src/test/scala/akka/http/coding/DeflateSpec.scala @@ -0,0 +1,82 @@ +/* + * Copyright (C) 2009-2014 Typesafe Inc. + */ + +package akka.http.coding + +import akka.util.ByteString +import akka.http.util._ +import org.scalatest.WordSpec +import akka.http.model._ +import HttpMethods.POST + +import java.io.ByteArrayOutputStream +import java.util.zip.{ DeflaterOutputStream, InflaterOutputStream } + +class DeflateSpec extends WordSpec with CodecSpecSupport { + + "The Deflate codec" should { + "properly encode a small string" in { + streamInflate(ourDeflate(smallTextBytes)) should readAs(smallText) + } + "properly decode a small string" in { + ourInflate(streamDeflate(smallTextBytes)) should readAs(smallText) + } + "properly round-trip encode/decode a small string" in { + ourInflate(ourDeflate(smallTextBytes)) should readAs(smallText) + } + "properly encode a large string" in { + streamInflate(ourDeflate(largeTextBytes)) should readAs(largeText) + } + "properly decode a large string" in { + ourInflate(streamDeflate(largeTextBytes)) should readAs(largeText) + } + "properly round-trip encode/decode a large string" in { + ourInflate(ourDeflate(largeTextBytes)) should readAs(largeText) + } + "properly round-trip encode/decode an HttpRequest" in { + val request = HttpRequest(POST, entity = HttpEntity(largeText)) + Deflate.decode(Deflate.encode(request)) should equal(request) + } + "provide a better compression ratio than the standard Deflater/Inflater streams" in { + ourDeflate(largeTextBytes).length should be < streamDeflate(largeTextBytes).length + } + "support chunked round-trip encoding/decoding" in { + val chunks = largeTextBytes.grouped(512).toVector + val comp = Deflate.newCompressor + val decomp = Deflate.newDecompressor + val chunks2 = + chunks.map { chunk ⇒ + decomp.decompress(comp.compressAndFlush(chunk)) + } :+ + decomp.decompress(comp.finish()) + chunks2.join should readAs(largeText) + } + "works for any split in prefix + suffix" in { + val compressed = streamDeflate(smallTextBytes) + def tryWithPrefixOfSize(prefixSize: Int): Unit = { + val decomp = Deflate.newDecompressor + val prefix = compressed.take(prefixSize) + val suffix = compressed.drop(prefixSize) + + decomp.decompress(prefix) ++ decomp.decompress(suffix) should readAs(smallText) + } + (0 to compressed.size).foreach(tryWithPrefixOfSize) + } + } + + def ourDeflate(bytes: ByteString): ByteString = Deflate.newCompressor.compressAndFinish(bytes) + def ourInflate(bytes: ByteString): ByteString = Deflate.newDecompressor.decompress(bytes) + + def streamDeflate(bytes: ByteString) = { + val output = new ByteArrayOutputStream() + val dos = new DeflaterOutputStream(output); dos.write(bytes.toArray); dos.close() + ByteString(output.toByteArray) + } + + def streamInflate(bytes: ByteString) = { + val output = new ByteArrayOutputStream() + val ios = new InflaterOutputStream(output); ios.write(bytes.toArray); ios.close() + ByteString(output.toByteArray) + } +} diff --git a/akka-http-tests/src/test/scala/akka/http/coding/EncoderSpec.scala b/akka-http-tests/src/test/scala/akka/http/coding/EncoderSpec.scala new file mode 100644 index 0000000000..2d99c618ce --- /dev/null +++ b/akka-http-tests/src/test/scala/akka/http/coding/EncoderSpec.scala @@ -0,0 +1,45 @@ +/* + * Copyright (C) 2009-2014 Typesafe Inc. + */ + +package akka.http.coding + +import akka.util.ByteString +import org.scalatest.WordSpec +import akka.http.model._ +import headers._ +import HttpMethods.POST + +class EncoderSpec extends WordSpec with CodecSpecSupport { + + "An Encoder" should { + "not transform the message if messageFilter returns false" in { + val request = HttpRequest(POST, entity = HttpEntity(smallText.getBytes("UTF8"))) + DummyEncoder.encode(request) === request + } + "correctly transform the HttpMessage if messageFilter returns true" in { + val request = HttpRequest(POST, entity = HttpEntity(smallText)) + val encoded = DummyEncoder.encode(request) + encoded.headers === List(`Content-Encoding`(DummyEncoder.encoding)) + encoded.entity === HttpEntity(dummyCompress(smallText)) + } + } + + def dummyCompress(s: String): String = dummyCompress(ByteString(s, "UTF8")).utf8String + def dummyCompress(bytes: ByteString): ByteString = DummyCompressor.compressAndFinish(bytes) + + case object DummyEncoder extends Encoder { + val messageFilter = Encoder.DefaultFilter + val encoding = HttpEncodings.compress + def newCompressor = DummyCompressor + } + + case object DummyCompressor extends Compressor { + def compress(input: ByteString) = input ++ ByteString("compressed") + def flush() = ByteString.empty + def finish() = ByteString.empty + + def compressAndFlush(input: ByteString): ByteString = compress(input) + def compressAndFinish(input: ByteString): ByteString = compress(input) + } +} diff --git a/akka-http-tests/src/test/scala/akka/http/coding/GzipSpec.scala b/akka-http-tests/src/test/scala/akka/http/coding/GzipSpec.scala new file mode 100644 index 0000000000..e7bb9b8970 --- /dev/null +++ b/akka-http-tests/src/test/scala/akka/http/coding/GzipSpec.scala @@ -0,0 +1,130 @@ +/* + * Copyright (C) 2009-2014 Typesafe Inc. + */ + +package akka.http.coding + +import org.scalatest.WordSpec +import akka.http.model._ +import akka.http.util._ +import HttpMethods.POST + +import java.io.{ InputStream, OutputStream, ByteArrayInputStream, ByteArrayOutputStream } +import java.util.zip.{ DataFormatException, GZIPInputStream, GZIPOutputStream } + +import akka.util.ByteString + +import scala.annotation.tailrec + +class GzipSpec extends WordSpec with CodecSpecSupport { + + "The Gzip codec" should { + "properly encode a small string" in { + streamGunzip(ourGzip(smallTextBytes)) should readAs(smallText) + } + "properly decode a small string" in { + ourGunzip(streamGzip(smallTextBytes)) should readAs(smallText) + } + "properly round-trip encode/decode a small string" in { + ourGunzip(ourGzip(smallTextBytes)) should readAs(smallText) + } + "properly encode a large string" in { + streamGunzip(ourGzip(largeTextBytes)) should readAs(largeText) + } + "properly decode a large string" in { + ourGunzip(streamGzip(largeTextBytes)) should readAs(largeText) + } + "properly round-trip encode/decode a large string" in { + ourGunzip(ourGzip(largeTextBytes)) should readAs(largeText) + } + "properly round-trip encode/decode an HttpRequest" in { + val request = HttpRequest(POST, entity = HttpEntity(largeText)) + Gzip.decode(Gzip.encode(request)) should equal(request) + } + "provide a better compression ratio than the standard Gzip/Gunzip streams" in { + ourGzip(largeTextBytes).length should be < streamGzip(largeTextBytes).length + } + "properly decode concatenated compressions" in { + ourGunzip(Seq(gzip("Hello,"), gzip(" dear "), gzip("User!")).join) should readAs("Hello, dear User!") + } + "throw an error on corrupt input" in { + val ex = the[DataFormatException] thrownBy ourGunzip(corruptGzipContent) + ex.getMessage should equal("invalid literal/length code") + } + "not throw an error if a subsequent block is corrupt" in { + pending // FIXME: should we read as long as possible and only then report an error, that seems somewhat arbitrary + ourGunzip(Seq(gzip("Hello,"), gzip(" dear "), corruptGzipContent).join) should readAs("Hello, dear ") + } + "decompress in very small chunks" in { + val compressed = gzip("Hello") + val decomp = Gzip.newDecompressor + val result = decomp.decompress(compressed.take(10)) // just the headers + result.size should equal(0) + val data = decomp.decompress(compressed.drop(10)) // the rest + data should readAs("Hello") + } + "support chunked round-trip encoding/decoding" in { + val chunks = largeTextBytes.grouped(512).toVector + val comp = Gzip.newCompressor + val decomp = Gzip.newDecompressor + val chunks2 = + chunks.map { chunk ⇒ decomp.decompress(comp.compressAndFlush(chunk)) } :+ decomp.decompress(comp.finish()) + chunks2.join should readAs(largeText) + } + "works for any split in prefix + suffix" in { + val compressed = streamGzip(smallTextBytes) + def tryWithPrefixOfSize(prefixSize: Int): Unit = { + val decomp = Gzip.newDecompressor + val prefix = compressed.take(prefixSize) + val suffix = compressed.drop(prefixSize) + + decomp.decompress(prefix) ++ decomp.decompress(suffix) should readAs(smallText) + } + (0 to compressed.size).foreach(tryWithPrefixOfSize) + } + "works for chunked compressed data of sizes just above 1024" in { + val comp = new GzipCompressor + val decomp = new GzipDecompressor + + val inputBytes = ByteString("""{"baseServiceURL":"http://www.acme.com","endpoints":{"assetSearchURL":"/search","showsURL":"/shows","mediaContainerDetailURL":"/container","featuredTapeURL":"/tape","assetDetailURL":"/asset","moviesURL":"/movies","recentlyAddedURL":"/recent","topicsURL":"/topics","scheduleURL":"/schedule"},"urls":{"aboutAweURL":"www.foobar.com"},"channelName":"Cool Stuff","networkId":"netId","slotProfile":"slot_1","brag":{"launchesUntilPrompt":10,"daysUntilPrompt":5,"launchesUntilReminder":5,"daysUntilReminder":2},"feedbackEmailAddress":"feedback@acme.com","feedbackEmailSubject":"Commends from User","splashSponsor":[],"adProvider":{"adProviderProfile":"","adProviderProfileAndroid":"","adProviderNetworkID":0,"adProviderSiteSectionNetworkID":0,"adProviderVideoAssetNetworkID":0,"adProviderSiteSectionCustomID":{},"adProviderServerURL":"","adProviderLiveVideoAssetID":""},"update":[{"forPlatform":"ios","store":{"iTunes":"www.something.com"},"minVer":"1.2.3","notificationVer":"1.2.5"},{"forPlatform":"android","store":{"amazon":"www.something.com","play":"www.something.com"},"minVer":"1.2.3","notificationVer":"1.2.5"}],"tvRatingPolicies":[{"type":"sometype","imageKey":"tv_rating_small","durationMS":15000,"precedence":1},{"type":"someothertype","imageKey":"tv_rating_big","durationMS":15000,"precedence":2}],"exts":{"adConfig":{"globals":{"#{adNetworkID}":"2620","#{ssid}":"usa_tveapp"},"iPad":{"showlist":{"adMobAdUnitID":"/2620/usa_tveapp_ipad/shows","adSize":[{"#{height}":90,"#{width}":728}]},"launch":{"doubleClickCallbackURL":"http://pubads.g.doubleclick.net/gampad/ad?iu=/2620/usa_tveapp_ipad&sz=1x1&t=&c=#{doubleclickrandom}"},"watchwithshowtile":{"adMobAdUnitID":"/2620/usa_tveapp_ipad/watchwithshowtile","adSize":[{"#{height}":120,"#{width}":240}]},"showpage":{"doubleClickCallbackURL":"http://pubads.g.doubleclick.net/gampad/ad?iu=/2620/usa_tveapp_ipad/shows/#{SHOW_NAME}&sz=1x1&t=&c=#{doubleclickrandom}"}},"iPadRetina":{"showlist":{"adMobAdUnitID":"/2620/usa_tveapp_ipad/shows","adSize":[{"#{height}":90,"#{width}":728}]},"launch":{"doubleClickCallbackURL":"http://pubads.g.doubleclick.net/gampad/ad?iu=/2620/usa_tveapp_ipad&sz=1x1&t=&c=#{doubleclickrandom}"},"watchwithshowtile":{"adMobAdUnitID":"/2620/usa_tveapp_ipad/watchwithshowtile","adSize":[{"#{height}":120,"#{width}":240}]},"showpage":{"doubleClickCallbackURL":"http://pubads.g.doubleclick.net/gampad/ad?iu=/2620/usa_tveapp_ipad/shows/#{SHOW_NAME}&sz=1x1&t=&c=#{doubleclickrandom}"}},"iPhone":{"home":{"adMobAdUnitID":"/2620/usa_tveapp_iphone/home","adSize":[{"#{height}":50,"#{width}":300},{"#{height}":50,"#{width}":320}]},"showlist":{"adMobAdUnitID":"/2620/usa_tveapp_iphone/shows","adSize":[{"#{height}":50,"#{width}":300},{"#{height}":50,"#{width}":320}]},"episodepage":{"adMobAdUnitID":"/2620/usa_tveapp_iphone/shows/#{SHOW_NAME}","adSize":[{"#{height}":50,"#{width}":300},{"#{height}":50,"#{width}":320}]},"launch":{"doubleClickCallbackURL":"http://pubads.g.doubleclick.net/gampad/ad?iu=/2620/usa_tveapp_iphone&sz=1x1&t=&c=#{doubleclickrandom}"},"showpage":{"doubleClickCallbackURL":"http://pubads.g.doubleclick.net/gampad/ad?iu=/2620/usa_tveapp_iphone/shows/#{SHOW_NAME}&sz=1x1&t=&c=#{doubleclickrandom}"}},"iPhoneRetina":{"home":{"adMobAdUnitID":"/2620/usa_tveapp_iphone/home","adSize":[{"#{height}":50,"#{width}":300},{"#{height}":50,"#{width}":320}]},"showlist":{"adMobAdUnitID":"/2620/usa_tveapp_iphone/shows","adSize":[{"#{height}":50,"#{width}":300},{"#{height}":50,"#{width}":320}]},"episodepage":{"adMobAdUnitID":"/2620/usa_tveapp_iphone/shows/#{SHOW_NAME}","adSize":[{"#{height}":50,"#{width}":300},{"#{height}":50,"#{width}":320}]},"launch":{"doubleClickCallbackURL":"http://pubads.g.doubleclick.net/gampad/ad?iu=/2620/usa_tveapp_iphone&sz=1x1&t=&c=#{doubleclickrandom}"},"showpage":{"doubleClickCallbackURL":"http://pubads.g.doubleclick.net/gampad/ad?iu=/2620/usa_tveapp_iphone/shows/#{SHOW_NAME}&sz=1x1&t=&c=#{doubleclickrandom}"}},"Tablet":{"home":{"adMobAdUnitID":"/2620/usa_tveapp_androidtab/home","adSize":[{"#{height}":90,"#{width}":728},{"#{height}":50,"#{width}":320},{"#{height}":50,"#{width}":300}]},"showlist":{"adMobAdUnitID":"/2620/usa_tveapp_androidtab/shows","adSize":[{"#{height}":90,"#{width}":728},{"#{height}":50,"#{width}":320},{"#{height}":50,"#{width}":300}]},"episodepage":{"adMobAdUnitID":"/2620/usa_tveapp_androidtab/shows/#{SHOW_NAME}","adSize":[{"#{height}":90,"#{width}":728},{"#{height}":50,"#{width}":320},{"#{height}":50,"#{width}":300}]},"launch":{"doubleClickCallbackURL":"http://pubads.g.doubleclick.net/gampad/ad?iu=/2620/usa_tveapp_androidtab&sz=1x1&t=&c=#{doubleclickrandom}"},"showpage":{"doubleClickCallbackURL":"http://pubads.g.doubleclick.net/gampad/ad?iu=/2620/usa_tveapp_androidtab/shows/#{SHOW_NAME}&sz=1x1&t=&c=#{doubleclickrandom}"}},"TabletHD":{"home":{"adMobAdUnitID":"/2620/usa_tveapp_androidtab/home","adSize":[{"#{height}":90,"#{width}":728},{"#{height}":50,"#{width}":320},{"#{height}":50,"#{width}":300}]},"showlist":{"adMobAdUnitID":"/2620/usa_tveapp_androidtab/shows","adSize":[{"#{height}":90,"#{width}":728},{"#{height}":50,"#{width}":320},{"#{height}":50,"#{width}":300}]},"episodepage":{"adMobAdUnitID":"/2620/usa_tveapp_androidtab/shows/#{SHOW_NAME}","adSize":[{"#{height}":90,"#{width}":728},{"#{height}":50,"#{width}":320},{"#{height}":50,"#{width}":300}]},"launch":{"doubleClickCallbackURL":"http://pubads.g.doubleclick.net/gampad/ad?iu=/2620/usa_tveapp_androidtab&sz=1x1&t=&c=#{doubleclickrandom}"},"showpage":{"doubleClickCallbackURL":"http://pubads.g.doubleclick.net/gampad/ad?iu=/2620/usa_tveapp_androidtab/shows/#{SHOW_NAME}&sz=1x1&t=&c=#{doubleclickrandom}"}},"Phone":{"home":{"adMobAdUnitID":"/2620/usa_tveapp_android/home","adSize":[{"#{height}":50,"#{width}":300},{"#{height}":50,"#{width}":320}]},"showlist":{"adMobAdUnitID":"/2620/usa_tveapp_android/shows","adSize":[{"#{height}":50,"#{width}":300},{"#{height}":50,"#{width}":320}]},"episodepage":{"adMobAdUnitID":"/2620/usa_tveapp_android/shows/#{SHOW_NAME}","adSize":[{"#{height}":50,"#{width}":300},{"#{height}":50,"#{width}":320}]},"launch":{"doubleClickCallbackURL":"http://pubads.g.doubleclick.net/gampad/ad?iu=/2620/usa_tveapp_android&sz=1x1&t=&c=#{doubleclickrandom}"},"showpage":{"doubleClickCallbackURL":"http://pubads.g.doubleclick.net/gampad/ad?iu=/2620/usa_tveapp_android/shows/#{SHOW_NAME}&sz=1x1&t=&c=#{doubleclickrandom}"}},"PhoneHD":{"home":{"adMobAdUnitID":"/2620/usa_tveapp_android/home","adSize":[{"#{height}":50,"#{width}":300},{"#{height}":50,"#{width}":320}]},"showlist":{"adMobAdUnitID":"/2620/usa_tveapp_android/shows","adSize":[{"#{height}":50,"#{width}":300},{"#{height}":50,"#{width}":320}]},"episodepage":{"adMobAdUnitID":"/2620/usa_tveapp_android/shows/#{SHOW_NAME}","adSize":[{"#{height}":50,"#{width}":300},{"#{height}":50,"#{width}":320}]},"launch":{"doubleClickCallbackURL":"http://pubads.g.doubleclick.net/gampad/ad?iu=/2620/usa_tveapp_android&sz=1x1&t=&c=#{doubleclickrandom}"},"showpage":{"doubleClickCallbackURL":"http://pubads.g.doubleclick.net/gampad/ad?iu=/2620/usa_tveapp_android/shows/#{SHOW_NAME}&sz=1x1&t=&c=#{doubleclickrandom}"}}}}}""", "utf8") + val compressed = comp.compressAndFinish(inputBytes) + + val decompressed = decomp.decompress(compressed) + decompressed should equal(inputBytes) + } + } + + def gzip(s: String) = ourGzip(ByteString(s, "UTF8")) + def ourGzip(bytes: ByteString): ByteString = Gzip.newCompressor.compressAndFinish(bytes) + def ourGunzip(bytes: ByteString): ByteString = Gzip.newDecompressor.decompress(bytes) + + lazy val corruptGzipContent = { + val content = gzip("Hello").toArray + content(14) = 26.toByte + ByteString(content) + } + + def streamGzip(bytes: ByteString): ByteString = { + val output = new ByteArrayOutputStream() + val gos = new GZIPOutputStream(output); gos.write(bytes.toArray); gos.close() + ByteString(output.toByteArray) + } + + def streamGunzip(bytes: ByteString): ByteString = { + val output = new ByteArrayOutputStream() + val input = new GZIPInputStream(new ByteArrayInputStream(bytes.toArray)) + + val buffer = new Array[Byte](500) + @tailrec def copy(from: InputStream, to: OutputStream): Unit = { + val read = from.read(buffer) + if (read >= 0) { + to.write(buffer, 0, read) + copy(from, to) + } + } + + copy(input, output) + ByteString(output.toByteArray) + } + +} diff --git a/akka-http/src/main/scala/akka/http/coding/DataMapper.scala b/akka-http/src/main/scala/akka/http/coding/DataMapper.scala new file mode 100644 index 0000000000..e3dce51d5b --- /dev/null +++ b/akka-http/src/main/scala/akka/http/coding/DataMapper.scala @@ -0,0 +1,35 @@ +/* + * Copyright (C) 2009-2014 Typesafe Inc. + */ + +package akka.http.coding + +import akka.http.model.{ HttpRequest, HttpResponse, ResponseEntity, RequestEntity } +import akka.stream.{ Transformer, FlowMaterializer } +import akka.util.ByteString + +/** An abstraction to transform data bytes of HttpMessages or HttpEntities */ +sealed trait DataMapper[T] { + def transformDataBytes(t: T, transformer: () ⇒ Transformer[ByteString, ByteString])(implicit materializer: FlowMaterializer): T +} +object DataMapper { + implicit val mapRequestEntity: DataMapper[RequestEntity] = + new DataMapper[RequestEntity] { + def transformDataBytes(t: RequestEntity, transformer: () ⇒ Transformer[ByteString, ByteString])(implicit materializer: FlowMaterializer): RequestEntity = + t.transformDataBytes(transformer) + } + implicit val mapResponseEntity: DataMapper[ResponseEntity] = + new DataMapper[ResponseEntity] { + def transformDataBytes(t: ResponseEntity, transformer: () ⇒ Transformer[ByteString, ByteString])(implicit materializer: FlowMaterializer): ResponseEntity = + t.transformDataBytes(transformer) + } + + implicit val mapRequest: DataMapper[HttpRequest] = mapMessage(mapRequestEntity)((m, f) ⇒ m.withEntity(f(m.entity))) + implicit val mapResponse: DataMapper[HttpResponse] = mapMessage(mapResponseEntity)((m, f) ⇒ m.withEntity(f(m.entity))) + + def mapMessage[T, E](entityMapper: DataMapper[E])(mapEntity: (T, E ⇒ E) ⇒ T): DataMapper[T] = + new DataMapper[T] { + def transformDataBytes(t: T, transformer: () ⇒ Transformer[ByteString, ByteString])(implicit materializer: FlowMaterializer): T = + mapEntity(t, entityMapper.transformDataBytes(_, transformer)) + } +} \ No newline at end of file diff --git a/akka-http/src/main/scala/akka/http/coding/Decoder.scala b/akka-http/src/main/scala/akka/http/coding/Decoder.scala new file mode 100644 index 0000000000..46b0e80bdb --- /dev/null +++ b/akka-http/src/main/scala/akka/http/coding/Decoder.scala @@ -0,0 +1,42 @@ +/* + * Copyright (C) 2009-2014 Typesafe Inc. + */ + +package akka.http.coding + +import java.io.{ OutputStream, ByteArrayOutputStream } + +import akka.http.model._ +import akka.http.util.StreamUtils +import akka.stream.{ Transformer, FlowMaterializer } +import akka.util.ByteString +import headers.HttpEncoding + +trait Decoder { + def encoding: HttpEncoding + + def decode[T <: HttpMessage](message: T)(implicit mapper: DataMapper[T], materializer: FlowMaterializer): T#Self = + if (message.headers exists Encoder.isContentEncodingHeader) + decodeData(message).withHeaders(message.headers filterNot Encoder.isContentEncodingHeader) + else message.self + + def decodeData[T](t: T)(implicit mapper: DataMapper[T], materializer: FlowMaterializer): T = + mapper.transformDataBytes(t, newDecodeTransfomer) + + def newDecompressor: Decompressor + + def newDecodeTransfomer(): Transformer[ByteString, ByteString] = { + val decompressor = newDecompressor + + def decodeChunk(bytes: ByteString): ByteString = decompressor.decompress(bytes) + def finish(): ByteString = ByteString.empty + + StreamUtils.byteStringTransformer(decodeChunk, finish) + } +} + +/** A stateful object representing ongoing decompression. */ +abstract class Decompressor { + /** Decompress the buffer and return decompressed data. */ + def decompress(buffer: ByteString): ByteString +} diff --git a/akka-http/src/main/scala/akka/http/coding/Deflate.scala b/akka-http/src/main/scala/akka/http/coding/Deflate.scala new file mode 100644 index 0000000000..e2cbf5082c --- /dev/null +++ b/akka-http/src/main/scala/akka/http/coding/Deflate.scala @@ -0,0 +1,109 @@ +/* + * Copyright (C) 2009-2014 Typesafe Inc. + */ + +package akka.http.coding + +import java.io.OutputStream +import java.util.zip.{ DataFormatException, ZipException, Inflater, Deflater } +import akka.util.{ ByteStringBuilder, ByteString } + +import scala.annotation.tailrec +import akka.http.util._ +import akka.http.model._ +import headers.HttpEncodings + +class Deflate(val messageFilter: HttpMessage ⇒ Boolean) extends Decoder with Encoder { + val encoding = HttpEncodings.deflate + def newCompressor = new DeflateCompressor + def newDecompressor = new DeflateDecompressor +} + +/** + * An encoder and decoder for the HTTP 'deflate' encoding. + */ +object Deflate extends Deflate(Encoder.DefaultFilter) { + def apply(messageFilter: HttpMessage ⇒ Boolean) = new Deflate(messageFilter) +} + +class DeflateCompressor extends Compressor { + protected lazy val deflater = new Deflater(Deflater.BEST_COMPRESSION, false) + + override final def compressAndFlush(input: ByteString): ByteString = { + val buffer = newTempBuffer(input.size) + + compressWithBuffer(input, buffer) ++ flushWithBuffer(buffer) + } + override final def compressAndFinish(input: ByteString): ByteString = { + val buffer = newTempBuffer(input.size) + + compressWithBuffer(input, buffer) ++ finishWithBuffer(buffer) + } + override final def compress(input: ByteString): ByteString = compressWithBuffer(input, newTempBuffer()) + override final def flush(): ByteString = flushWithBuffer(newTempBuffer()) + override final def finish(): ByteString = finishWithBuffer(newTempBuffer()) + + protected def compressWithBuffer(input: ByteString, buffer: Array[Byte]): ByteString = { + assert(deflater.needsInput()) + deflater.setInput(input.toArray) + drain(buffer) + } + protected def flushWithBuffer(buffer: Array[Byte]): ByteString = { + // trick the deflater into flushing: switch compression level + // FIXME: use proper APIs and SYNC_FLUSH when Java 6 support is dropped + deflater.deflate(EmptyByteArray, 0, 0) + deflater.setLevel(Deflater.NO_COMPRESSION) + val res1 = drain(buffer) + deflater.setLevel(Deflater.BEST_COMPRESSION) + val res2 = drain(buffer) + res1 ++ res2 + } + protected def finishWithBuffer(buffer: Array[Byte]): ByteString = { + deflater.finish() + val res = drain(buffer) + deflater.end() + res + } + + @tailrec + protected final def drain(buffer: Array[Byte], result: ByteStringBuilder = new ByteStringBuilder()): ByteString = { + val len = deflater.deflate(buffer) + if (len > 0) { + result ++= ByteString.fromArray(buffer, 0, len) + drain(buffer, result) + } else { + assert(deflater.needsInput()) + result.result() + } + } + + private def newTempBuffer(size: Int = 65536): Array[Byte] = + // The default size is somewhat arbitrary, we'd like to guess a better value but Deflater/zlib + // is buffering in an unpredictable manner. + // `compress` will only return any data if the buffered compressed data has some size in + // the region of 10000-50000 bytes. + // `flush` and `finish` will return any size depending on the previous input. + // This value will hopefully provide a good compromise between memory churn and + // excessive fragmentation of ByteStrings. + new Array[Byte](size) +} + +class DeflateDecompressor extends Decompressor { + protected lazy val inflater = new Inflater() + + def decompress(buffer: ByteString): ByteString = + try { + inflater.setInput(buffer.toArray) + drain(new Array[Byte](buffer.length * 2)) + } catch { + case e: DataFormatException ⇒ + throw new ZipException(e.getMessage.toOption getOrElse "Invalid ZLIB data format") + } + + @tailrec protected final def drain(buffer: Array[Byte], result: ByteString = ByteString.empty): ByteString = { + val len = inflater.inflate(buffer) + if (len > 0) drain(buffer, result ++ ByteString.fromArray(buffer, 0, len)) + else if (inflater.needsDictionary) throw new ZipException("ZLIB dictionary missing") + else result + } +} diff --git a/akka-http/src/main/scala/akka/http/coding/Encoder.scala b/akka-http/src/main/scala/akka/http/coding/Encoder.scala new file mode 100644 index 0000000000..b148afeb5d --- /dev/null +++ b/akka-http/src/main/scala/akka/http/coding/Encoder.scala @@ -0,0 +1,75 @@ +/* + * Copyright (C) 2009-2014 Typesafe Inc. + */ + +package akka.http.coding + +import java.io.ByteArrayOutputStream +import akka.http.model._ +import akka.http.util.StreamUtils +import akka.stream.{ Transformer, FlowMaterializer } +import akka.util.ByteString +import headers._ + +trait Encoder { + def encoding: HttpEncoding + + def messageFilter: HttpMessage ⇒ Boolean + + def encode[T <: HttpMessage](message: T)(implicit mapper: DataMapper[T], materializer: FlowMaterializer): T#Self = + if (messageFilter(message) && !message.headers.exists(Encoder.isContentEncodingHeader)) + encodeData(message).withHeaders(`Content-Encoding`(encoding) +: message.headers) + else message.self + + def encodeData[T](t: T)(implicit mapper: DataMapper[T], materializer: FlowMaterializer): T = + mapper.transformDataBytes(t, newEncodeTransformer) + + def newCompressor: Compressor + + def newEncodeTransformer(): Transformer[ByteString, ByteString] = { + val compressor = newCompressor + + def encodeChunk(bytes: ByteString): ByteString = compressor.compressAndFlush(bytes) + def finish(): ByteString = compressor.finish() + + StreamUtils.byteStringTransformer(encodeChunk, finish) + } +} + +object Encoder { + val DefaultFilter: HttpMessage ⇒ Boolean = { + case req: HttpRequest ⇒ isCompressible(req) + case res @ HttpResponse(status, _, _, _) ⇒ isCompressible(res) && status.isSuccess + } + private[coding] def isCompressible(msg: HttpMessage): Boolean = + msg.entity.contentType.mediaType.compressible + + private[coding] val isContentEncodingHeader: HttpHeader ⇒ Boolean = _.isInstanceOf[`Content-Encoding`] +} + +/** A stateful object representing ongoing compression. */ +abstract class Compressor { + /** + * Compresses the given input and returns compressed data. The implementation + * can and will choose to buffer output data to improve compression. Use + * `flush` or `compressAndFlush` to make sure that all input data has been + * compressed and pending output data has been returned. + */ + def compress(input: ByteString): ByteString + + /** + * Flushes any output data and returns the currently remaining compressed data. + */ + def flush(): ByteString + + /** + * Closes this compressed stream and return the remaining compressed data. After + * calling this method, this Compressor cannot be used any further. + */ + def finish(): ByteString + + /** Combines `compress` + `flush` */ + def compressAndFlush(input: ByteString): ByteString + /** Combines `compress` + `finish` */ + def compressAndFinish(input: ByteString): ByteString +} \ No newline at end of file diff --git a/akka-http/src/main/scala/akka/http/coding/Gzip.scala b/akka-http/src/main/scala/akka/http/coding/Gzip.scala new file mode 100644 index 0000000000..7cffe6fe62 --- /dev/null +++ b/akka-http/src/main/scala/akka/http/coding/Gzip.scala @@ -0,0 +1,209 @@ +/* + * Copyright (C) 2009-2014 Typesafe Inc. + */ + +package akka.http.coding + +import java.io.OutputStream +import java.util.zip.{ Inflater, CRC32, ZipException, Deflater } +import akka.util.ByteString + +import scala.annotation.tailrec +import akka.http.model._ +import headers.HttpEncodings + +import scala.util.control.NoStackTrace + +class Gzip(val messageFilter: HttpMessage ⇒ Boolean) extends Decoder with Encoder { + val encoding = HttpEncodings.gzip + def newCompressor = new GzipCompressor + def newDecompressor = new GzipDecompressor +} + +/** + * An encoder and decoder for the HTTP 'gzip' encoding. + */ +object Gzip extends Gzip(Encoder.DefaultFilter) { + def apply(messageFilter: HttpMessage ⇒ Boolean) = new Gzip(messageFilter) +} + +class GzipCompressor extends DeflateCompressor { + override protected lazy val deflater = new Deflater(Deflater.BEST_COMPRESSION, true) + private val checkSum = new CRC32 // CRC32 of uncompressed data + private var headerSent = false + private var bytesRead = 0L + + override protected def compressWithBuffer(input: ByteString, buffer: Array[Byte]): ByteString = { + updateCrc(input) + header() ++ super.compressWithBuffer(input, buffer) + } + override protected def flushWithBuffer(buffer: Array[Byte]): ByteString = header() ++ super.flushWithBuffer(buffer) + override protected def finishWithBuffer(buffer: Array[Byte]): ByteString = super.finishWithBuffer(buffer) ++ trailer() + + private def updateCrc(input: ByteString): Unit = { + checkSum.update(input.toArray) + bytesRead += input.length + } + private def header(): ByteString = + if (!headerSent) { + headerSent = true + GzipDecompressor.Header + } else ByteString.empty + + private def trailer(): ByteString = { + def int32(i: Int): ByteString = ByteString(i, i >> 8, i >> 16, i >> 24) + val crc = checkSum.getValue.toInt + val tot = bytesRead.toInt // truncated to 32bit as specified in https://tools.ietf.org/html/rfc1952#section-2 + val trailer = int32(crc) ++ int32(tot) + + trailer + } +} + +/** A suspendable gzip decompressor */ +class GzipDecompressor extends DeflateDecompressor { + override protected lazy val inflater = new Inflater(true) // disable ZLIB headers + override def decompress(input: ByteString): ByteString = DecompressionStateMachine.run(input) + + import GzipDecompressor._ + object DecompressionStateMachine extends StateMachine { + def initialState = readHeaders + + private def readHeaders(data: ByteString): Action = + // header has at least size 3 + if (data.size < 4) SuspendAndRetryWithMoreData + else try { + val reader = new ByteReader(data) + import reader._ + + if (readByte() != 0x1F || readByte() != 0x8B) fail("Not in GZIP format") // check magic header + if (readByte() != 8) fail("Unsupported GZIP compression method") // check compression method + val flags = readByte() + skip(6) // skip MTIME, XFL and OS fields + if ((flags & 4) > 0) skip(readShort()) // skip optional extra fields + if ((flags & 8) > 0) while (readByte() != 0) {} // skip optional file name + if ((flags & 16) > 0) while (readByte() != 0) {} // skip optional file comment + if ((flags & 2) > 0 && crc16(data.take(currentOffset)) != readShort()) fail("Corrupt GZIP header") + + ContinueWith(deflate(new CRC32), remainingData) + } catch { + case ByteReader.NeedMoreData ⇒ SuspendAndRetryWithMoreData + } + + private def deflate(checkSum: CRC32)(data: ByteString): Action = { + assert(inflater.needsInput()) + inflater.setInput(data.toArray) + val output = drain(new Array[Byte](data.length * 2)) + checkSum.update(output.toArray) + if (inflater.finished()) EmitAndContinueWith(output, readTrailer(checkSum), data.takeRight(inflater.getRemaining)) + else EmitAndSuspend(output) + } + + private def readTrailer(checkSum: CRC32)(data: ByteString): Action = + try { + val reader = new ByteReader(data) + import reader._ + + if (readInt() != checkSum.getValue.toInt) fail("Corrupt data (CRC32 checksum error)") + if (readInt() != inflater.getBytesWritten.toInt /* truncated to 32bit */ ) fail("Corrupt GZIP trailer ISIZE") + + inflater.reset() + checkSum.reset() + ContinueWith(initialState, remainingData) // start over to support multiple concatenated gzip streams + } catch { + case ByteReader.NeedMoreData ⇒ SuspendAndRetryWithMoreData + } + + private def fail(msg: String) = Fail(new ZipException(msg)) + + private def crc16(data: ByteString) = { + val crc = new CRC32 + crc.update(data.toArray) + crc.getValue.toInt & 0xFFFF + } + } +} + +/** INTERNAL API */ +private[http] object GzipDecompressor { + // RFC 1952: http://tools.ietf.org/html/rfc1952 section 2.2 + val Header = ByteString( + 31, // ID1 + -117, // ID2 + 8, // CM = Deflate + 0, // FLG + 0, // MTIME 1 + 0, // MTIME 2 + 0, // MTIME 3 + 0, // MTIME 4 + 0, // XFL + 0 // OS + ) + + class ByteReader(input: ByteString) { + import ByteReader.NeedMoreData + + private[this] var off = 0 + + def readByte(): Int = + if (off < input.length) { + val x = input(off) + off += 1 + x.toInt & 0xFF + } else throw NeedMoreData + def readShort(): Int = readByte() | (readByte() << 8) + def readInt(): Int = readShort() | (readShort() << 16) + def skip(numBytes: Int): Unit = + if (off + numBytes <= input.length) off += numBytes + else throw NeedMoreData + def currentOffset: Int = off + def remainingData: ByteString = input.drop(off) + } + object ByteReader { + val NeedMoreData = new Exception with NoStackTrace + } + + /** A simple state machine implementation for suspendable parsing */ + trait StateMachine { + sealed trait Action + /** Cache the current input and suspend to wait for more data */ + case object SuspendAndRetryWithMoreData extends Action + /** Emit some output and suspend in the current state and wait for more data */ + case class EmitAndSuspend(output: ByteString) extends Action + /** Proceed to the nextState and immediately run it with the remainingInput */ + case class ContinueWith(nextState: State, remainingInput: ByteString) extends Action + /** Emit some output and then proceed to the nextState and immediately run it with the remainingInput */ + case class EmitAndContinueWith(output: ByteString, nextState: State, remainingInput: ByteString) extends Action + /** Fail with the given exception and go into the failed state which will throw for any new data */ + case class Fail(cause: Throwable) extends Action + + type State = ByteString ⇒ Action + def initialState: State + + private[this] var state: State = initialState + + /** Run the state machine with the current input */ + @tailrec final def run(input: ByteString, result: ByteString = ByteString.empty): ByteString = + state(input) match { + case SuspendAndRetryWithMoreData ⇒ + val oldState = state + state = { newData ⇒ + state = oldState + oldState(input ++ newData) + } + result + case EmitAndSuspend(output) ⇒ result ++ output + case ContinueWith(next, remainingInput) ⇒ + state = next + run(remainingInput, result) + case EmitAndContinueWith(output, next, remainingInput) ⇒ + state = next + run(remainingInput, result ++ output) + case Fail(cause) ⇒ + state = failState + throw cause + } + + private def failState: State = _ ⇒ throw new IllegalStateException("Trying to reuse failed decompressor.") + } +} diff --git a/akka-http/src/main/scala/akka/http/coding/NoEncoding.scala b/akka-http/src/main/scala/akka/http/coding/NoEncoding.scala new file mode 100644 index 0000000000..5e736372c8 --- /dev/null +++ b/akka-http/src/main/scala/akka/http/coding/NoEncoding.scala @@ -0,0 +1,39 @@ +/* + * Copyright (C) 2009-2014 Typesafe Inc. + */ + +package akka.http.coding + +import akka.http.model._ +import akka.stream.FlowMaterializer +import akka.util.ByteString +import headers.HttpEncodings + +/** + * An encoder and decoder for the HTTP 'identity' encoding. + */ +object NoEncoding extends Decoder with Encoder { + val encoding = HttpEncodings.identity + + override def encode[T <: HttpMessage](message: T)(implicit mapper: DataMapper[T], materializer: FlowMaterializer): T#Self = message.self + override def encodeData[T](t: T)(implicit mapper: DataMapper[T], materializer: FlowMaterializer): T = t + override def decode[T <: HttpMessage](message: T)(implicit mapper: DataMapper[T], materializer: FlowMaterializer): T#Self = message.self + override def decodeData[T](t: T)(implicit mapper: DataMapper[T], materializer: FlowMaterializer): T = t + + val messageFilter: HttpMessage ⇒ Boolean = _ ⇒ false + + def newCompressor = NoEncodingCompressor + def newDecompressor = NoEncodingDecompressor +} + +object NoEncodingCompressor extends Compressor { + def compress(input: ByteString): ByteString = input + def flush() = ByteString.empty + def finish() = ByteString.empty + + def compressAndFlush(input: ByteString): ByteString = input + def compressAndFinish(input: ByteString): ByteString = input +} +object NoEncodingDecompressor extends Decompressor { + def decompress(input: ByteString): ByteString = input +} \ No newline at end of file