Merge pull request #15984 from spray/w/15910-encoding-infrastructure

+htp #15910 import coding infrastructure from spray
This commit is contained in:
Björn Antonsson 2014-10-03 11:13:06 +02:00
commit be581dffc1
17 changed files with 1073 additions and 3 deletions

View file

@ -14,11 +14,14 @@ import scala.concurrent.{ Future, ExecutionContext }
import scala.concurrent.duration.FiniteDuration
import scala.collection.immutable
import akka.util.ByteString
import akka.stream.{ TimerTransformer, FlowMaterializer }
import akka.stream.{ Transformer, TimerTransformer, FlowMaterializer }
import akka.stream.scaladsl.Flow
import akka.stream.impl.{ EmptyPublisher, SynchronousPublisherFromIterable }
import akka.http.util._
import japi.JavaMapping.Implicits._
import scala.util.control.NonFatal
/**
* Models the entity (aka "body" or "content) of an HTTP message.
*/
@ -63,6 +66,16 @@ sealed trait HttpEntity extends japi.HttpEntity {
Flow(dataBytes).timerTransform("toStrict", transformer).toFuture()
}
/**
* Returns a copy of the given entity with the ByteString chunks of this entity transformed by the given transformer.
* For a `Chunked` entity, the chunks will be transformed one by one keeping the chunk metadata (but may introduce an
* extra chunk before the `LastChunk` if `transformer.onTermination` returns additional data).
*
* This method may only throw an exception if the `transformer` function throws an exception while creating the transformer.
* Any other errors are reported through the new entity data stream.
*/
def transformDataBytes(transformer: () Transformer[ByteString, ByteString])(implicit materializer: FlowMaterializer): HttpEntity
/**
* Creates a copy of this HttpEntity with the `contentType` overridden with the given one.
*/
@ -85,10 +98,14 @@ sealed trait BodyPartEntity extends HttpEntity with japi.BodyPartEntity {
/* An entity that can be used for requests */
sealed trait RequestEntity extends HttpEntity with japi.RequestEntity with ResponseEntity {
def withContentType(contentType: ContentType): RequestEntity
override def transformDataBytes(transformer: () Transformer[ByteString, ByteString])(implicit materializer: FlowMaterializer): RequestEntity
}
/* An entity that can be used for responses */
sealed trait ResponseEntity extends HttpEntity with japi.ResponseEntity {
def withContentType(contentType: ContentType): ResponseEntity
override def transformDataBytes(transformer: () Transformer[ByteString, ByteString])(implicit materializer: FlowMaterializer): ResponseEntity
}
/* An entity that can be used for requests, responses, and body parts */
sealed trait UniversalEntity extends japi.UniversalEntity with MessageEntity with BodyPartEntity {
@ -136,6 +153,17 @@ object HttpEntity {
override def toStrict(timeout: FiniteDuration)(implicit ec: ExecutionContext, fm: FlowMaterializer) =
FastFuture.successful(this)
override def transformDataBytes(transformer: () Transformer[ByteString, ByteString])(implicit materializer: FlowMaterializer): MessageEntity = {
try {
val t = transformer()
val newData = (t.onNext(data) ++ t.onTermination(None)).join
copy(data = newData)
} catch {
case NonFatal(ex)
Chunked(contentType, StreamUtils.failedPublisher(ex))
}
}
def withContentType(contentType: ContentType): Strict =
if (contentType == this.contentType) this else copy(contentType = contentType)
}
@ -154,6 +182,12 @@ object HttpEntity {
def dataBytes(implicit fm: FlowMaterializer): Publisher[ByteString] = data
override def transformDataBytes(transformer: () Transformer[ByteString, ByteString])(implicit materializer: FlowMaterializer): Chunked = {
val chunks = Flow(data).transform("transformDataBytes-Default", () transformer().map(Chunk(_): ChunkStreamPart)).toPublisher()
HttpEntity.Chunked(contentType, chunks)
}
def withContentType(contentType: ContentType): Default =
if (contentType == this.contentType) this else copy(contentType = contentType)
}
@ -184,6 +218,10 @@ object HttpEntity {
override def isCloseDelimited: Boolean = true
def withContentType(contentType: ContentType): CloseDelimited =
if (contentType == this.contentType) this else copy(contentType = contentType)
override def transformDataBytes(transformer: () Transformer[ByteString, ByteString])(implicit materializer: FlowMaterializer): CloseDelimited =
HttpEntity.CloseDelimited(contentType,
Flow(data).transform("transformDataBytes-CloseDelimited", transformer).toPublisher())
}
/**
@ -196,6 +234,10 @@ object HttpEntity {
override def isIndefiniteLength: Boolean = true
def withContentType(contentType: ContentType): IndefiniteLength =
if (contentType == this.contentType) this else copy(contentType = contentType)
override def transformDataBytes(transformer: () Transformer[ByteString, ByteString])(implicit materializer: FlowMaterializer): IndefiniteLength =
HttpEntity.IndefiniteLength(contentType,
Flow(data).transform("transformDataBytes-IndefiniteLength", transformer).toPublisher())
}
/**
@ -210,6 +252,35 @@ object HttpEntity {
def dataBytes(implicit fm: FlowMaterializer): Publisher[ByteString] =
Flow(chunks).map(_.data).filter(_.nonEmpty).toPublisher()
override def transformDataBytes(transformer: () Transformer[ByteString, ByteString])(implicit materializer: FlowMaterializer): Chunked = {
val newChunks =
Flow(chunks).transform("transformDataBytes-Chunked", () new Transformer[ChunkStreamPart, ChunkStreamPart] {
val byteTransformer = transformer()
var sentLastChunk = false
override def isComplete: Boolean = byteTransformer.isComplete
def onNext(element: ChunkStreamPart): immutable.Seq[ChunkStreamPart] = element match {
case Chunk(data, ext) Chunk(byteTransformer.onNext(data).join, ext) :: Nil
case l: LastChunk
sentLastChunk = true
Chunk(byteTransformer.onTermination(None).join) :: l :: Nil
}
override def onTermination(e: Option[Throwable]): immutable.Seq[ChunkStreamPart] = {
val remaining =
if (e.isEmpty && !sentLastChunk) byteTransformer.onTermination(None)
else if (e.isDefined /* && sentLastChunk */ ) byteTransformer.onTermination(e)
else Nil
if (remaining.nonEmpty) Chunk(remaining.join) :: Nil
else Nil
}
}).toPublisher()
HttpEntity.Chunked(contentType, newChunks)
}
def withContentType(contentType: ContentType): Chunked =
if (contentType == this.contentType) this else copy(contentType = contentType)

View file

@ -21,6 +21,7 @@ import FastFuture._
*/
sealed trait HttpMessage extends japi.HttpMessage {
type Self <: HttpMessage
def self: Self
def isRequest: Boolean
def isResponse: Boolean
@ -134,6 +135,7 @@ final case class HttpRequest(method: HttpMethod = HttpMethods.GET,
"HTTP/1.0 requests must not have a chunked entity")
type Self = HttpRequest
def self = this
override def isRequest = true
override def isResponse = false
@ -288,6 +290,7 @@ final case class HttpResponse(status: StatusCode = StatusCodes.OK,
entity: ResponseEntity = HttpEntity.Empty,
protocol: HttpProtocol = HttpProtocols.`HTTP/1.1`) extends japi.HttpResponse with HttpMessage {
type Self = HttpResponse
def self = this
override def isRequest = false
override def isResponse = true

View file

@ -0,0 +1,14 @@
/*
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.http.util
import akka.util.ByteString
/**
* INTERNAL API
*/
private[http] class EnhancedByteStringTraversableOnce(val byteStrings: TraversableOnce[ByteString]) extends AnyVal {
def join: ByteString = byteStrings.foldLeft(ByteString.empty)(_ ++ _)
}

View file

@ -0,0 +1,54 @@
/*
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.http.util
import akka.stream.impl.ErrorPublisher
import akka.stream.scaladsl.Flow
import akka.stream.{ FlowMaterializer, Transformer }
import akka.util.ByteString
import org.reactivestreams.Publisher
import scala.collection.immutable
/**
* INTERNAL API
*/
private[http] object StreamUtils {
/**
* Maps a transformer by strictly applying the given function to each output element.
*/
def mapTransformer[T, U, V](t: Transformer[T, U], f: U V): Transformer[T, V] =
new Transformer[T, V] {
override def isComplete: Boolean = t.isComplete
def onNext(element: T): immutable.Seq[V] = t.onNext(element).map(f)
override def onTermination(e: Option[Throwable]): immutable.Seq[V] = t.onTermination(e).map(f)
override def onError(cause: Throwable): Unit = t.onError(cause)
override def cleanup(): Unit = t.cleanup()
}
/**
* Creates a transformer that will call `f` for each incoming ByteString and output its result. After the complete
* input has been read it will call `finish` once to determine the final ByteString to post to the output.
*/
def byteStringTransformer(f: ByteString ByteString, finish: () ByteString): Transformer[ByteString, ByteString] =
new Transformer[ByteString, ByteString] {
def onNext(element: ByteString): immutable.Seq[ByteString] = f(element) :: Nil
override def onTermination(e: Option[Throwable]): immutable.Seq[ByteString] =
if (e.isEmpty) {
val last = finish()
if (last.nonEmpty) last :: Nil
else Nil
} else super.onTermination(e)
}
def failedPublisher[T](ex: Throwable): Publisher[T] =
ErrorPublisher(ex).asInstanceOf[Publisher[T]]
}
private[http] class EnhancedTransformer[T, U](val t: Transformer[T, U]) extends AnyVal {
def map[V](f: U V): Transformer[T, V] = StreamUtils.mapTransformer(t, f)
}

View file

@ -30,6 +30,10 @@ package object util {
private[http] implicit def enhanceConfig(config: Config): EnhancedConfig = new EnhancedConfig(config)
private[http] implicit def enhanceString_(s: String): EnhancedString = new EnhancedString(s)
private[http] implicit def enhanceRegex(regex: Regex): EnhancedRegex = new EnhancedRegex(regex)
private[http] implicit def enhanceByteStrings(byteStrings: TraversableOnce[ByteString]): EnhancedByteStringTraversableOnce =
new EnhancedByteStringTraversableOnce(byteStrings)
private[http] implicit def enhanceTransformer[T, U](transformer: Transformer[T, U]): EnhancedTransformer[T, U] =
new EnhancedTransformer(transformer)
private[http] implicit class FlowWithHeadAndTail[T](val underlying: Flow[Publisher[T]]) extends AnyVal {
def headAndTail(implicit fm: FlowMaterializer): Flow[(T, Publisher[T])] =

View file

@ -7,6 +7,7 @@ package akka.http.model
import java.util.concurrent.TimeoutException
import com.typesafe.config.{ ConfigFactory, Config }
import org.reactivestreams.Publisher
import scala.collection.immutable
import scala.concurrent.{ Promise, Await }
import scala.concurrent.duration._
import org.scalatest.{ BeforeAndAfterAll, MustMatchers, FreeSpec }
@ -14,10 +15,9 @@ import org.scalatest.matchers.Matcher
import akka.util.ByteString
import akka.actor.ActorSystem
import akka.stream.scaladsl.Flow
import akka.stream.FlowMaterializer
import akka.stream.{ Transformer, FlowMaterializer }
import akka.stream.impl.SynchronousPublisherFromIterable
import akka.http.model.HttpEntity._
import akka.http.util.FastFuture._
class HttpEntitySpec extends FreeSpec with MustMatchers with BeforeAndAfterAll {
val tpe: ContentType = ContentTypes.`application/octet-stream`
@ -81,6 +81,27 @@ class HttpEntitySpec extends FreeSpec with MustMatchers with BeforeAndAfterAll {
}.getMessage must be("HttpEntity.toStrict timed out after 100 milliseconds while still waiting for outstanding data")
}
}
"support transformDataBytes" - {
"Strict" in {
Strict(tpe, abc) must transformTo(Strict(tpe, doubleChars("abc") ++ trailer))
}
"Default" in {
Default(tpe, 11, publisher(abc, de, fgh, ijk)) must
transformTo(Strict(tpe, doubleChars("abcdefghijk") ++ trailer))
}
"CloseDelimited" in {
CloseDelimited(tpe, publisher(abc, de, fgh, ijk)) must
transformTo(Strict(tpe, doubleChars("abcdefghijk") ++ trailer))
}
"Chunked w/o LastChunk" in {
Chunked(tpe, publisher(Chunk(abc), Chunk(fgh), Chunk(ijk))) must
transformTo(Strict(tpe, doubleChars("abcfghijk") ++ trailer))
}
"Chunked with LastChunk" in {
Chunked(tpe, publisher(Chunk(abc), Chunk(fgh), Chunk(ijk), LastChunk)) must
transformTo(Strict(tpe, doubleChars("abcfghijk") ++ trailer))
}
}
}
def publisher[T](elems: T*) = SynchronousPublisherFromIterable(elems.toList)
@ -93,4 +114,23 @@ class HttpEntitySpec extends FreeSpec with MustMatchers with BeforeAndAfterAll {
def strictifyTo(strict: Strict): Matcher[HttpEntity] =
equal(strict).matcher[Strict].compose(x Await.result(x.toStrict(250.millis), 250.millis))
def transformTo(strict: Strict): Matcher[HttpEntity] =
equal(strict).matcher[Strict].compose { x
val transformed = x.transformDataBytes(duplicateBytesTransformer)
Await.result(transformed.toStrict(250.millis), 250.millis)
}
def duplicateBytesTransformer(): Transformer[ByteString, ByteString] =
new Transformer[ByteString, ByteString] {
def onNext(bs: ByteString): immutable.Seq[ByteString] =
Vector(doubleChars(bs))
override def onTermination(e: Option[Throwable]): immutable.Seq[ByteString] =
Vector(trailer)
}
def trailer: ByteString = ByteString("--dup")
def doubleChars(bs: ByteString): ByteString = ByteString(bs.flatMap(b Seq(b, b)): _*)
def doubleChars(str: String): ByteString = doubleChars(ByteString(str))
}

View file

@ -0,0 +1,79 @@
/*
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.http.coding
import org.scalatest.{ Suite, BeforeAndAfterAll, Matchers }
import scala.concurrent.duration._
import akka.actor.ActorSystem
import akka.stream.FlowMaterializer
import akka.util.ByteString
trait CodecSpecSupport extends Matchers with BeforeAndAfterAll { self: Suite
def readAs(string: String, charset: String = "UTF8") = equal(string).matcher[String] compose { (_: ByteString).decodeString(charset) }
def hexDump(bytes: ByteString) = bytes.map("%02x".format(_)).mkString
def fromHexDump(dump: String) = dump.grouped(2).toArray.map(chars Integer.parseInt(new String(chars), 16).toByte)
def printBytes(i: Int, id: String) = {
def byte(i: Int) = (i & 0xFF).toHexString
println(id + ": " + byte(i) + ":" + byte(i >> 8) + ":" + byte(i >> 16) + ":" + byte(i >> 24))
i
}
lazy val smallTextBytes = ByteString(smallText, "UTF8")
lazy val largeTextBytes = ByteString(largeText, "UTF8")
val smallText = "Yeah!"
val largeText =
"""Lorem ipsum dolor sit amet, consetetur sadipscing elitr, sed diam nonumy eirmod tempor invidunt ut labore et dolore
magna aliquyam erat, sed diam voluptua. At vero eos et accusam et justo duo dolores et ea rebum. Stet clita kasd
gubergren, no sea takimata sanctus est Lorem ipsum dolor sit amet. Lorem ipsum dolor sit amet, consetetur sadipscing
elitr, sed diam nonumy eirmod tempor invidunt ut labore et dolore magna aliquyam erat, sed diam voluptua. At vero eos
et accusam et justo duo dolores et ea rebum. Stet clita kasd gubergren, no sea takimata sanctus est Lorem ipsum dolor
sit amet. Lorem ipsum dolor sit amet, consetetur sadipscing elitr, sed diam nonumy eirmod tempor invidunt ut labore et
dolore magna aliquyam erat, sed diam voluptua. At vero eos et accusam et justo duo dolores et ea rebum. Stet clita kasd
gubergren, no sea takimata sanctus est Lorem ipsum dolor sit amet.
Duis autem vel eum iriure dolor in hendrerit in vulputate velit esse molestie consequat, vel illum dolore eu feugiat
nulla facilisis at vero eros et accumsan et iusto odio dignissim qui blandit praesent luptatum zzril delenit augue duis
dolore te feugait nulla facilisi. Lorem ipsum dolor sit amet, consectetuer adipiscing elit, sed diam nonummy nibh
euismod tincidunt ut laoreet dolore magna aliquam erat volutpat.
Ut wisi enim ad minim veniam, quis nostrud exerci tation ullamcorper suscipit lobortis nisl ut aliquip ex ea commodo
consequat. Duis autem vel eum iriure dolor in hendrerit in vulputate velit esse molestie consequat, vel illum dolore eu
feugiat nulla facilisis at vero eros et accumsan et iusto odio dignissim qui blandit praesent luptatum zzril delenit
augue duis dolore te feugait nulla facilisi.
Nam liber tempor cum soluta nobis eleifend option congue nihil imperdiet doming id quod mazim placerat facer possim
assum. Lorem ipsum dolor sit amet, consectetuer adipiscing elit, sed diam nonummy nibh euismod tincidunt ut laoreet
dolore magna aliquam erat volutpat. Ut wisi enim ad minim veniam, quis nostrud exerci tation ullamcorper suscipit
lobortis nisl ut aliquip ex ea commodo consequat.
Duis autem vel eum iriure dolor in hendrerit in vulputate velit esse molestie consequat, vel illum dolore eu feugiat
nulla facilisis.
At vero eos et accusam et justo duo dolores et ea rebum. Stet clita kasd gubergren, no sea takimata sanctus est Lorem
ipsum dolor sit amet. Lorem ipsum dolor sit amet, consetetur sadipscing elitr, sed diam nonumy eirmod tempor invidunt
ut labore et dolore magna aliquyam erat, sed diam voluptua. At vero eos et accusam et justo duo dolores et ea rebum.
Stet clita kasd gubergren, no sea takimata sanctus est Lorem ipsum dolor sit amet. Lorem ipsum dolor sit amet,
consetetur sadipscing elitr, At accusam aliquyam diam diam dolore dolores duo eirmod eos erat, et nonumy sed tempor et
et invidunt justo labore Stet clita ea et gubergren, kasd magna no rebum. sanctus sea sed takimata ut vero voluptua.
est Lorem ipsum dolor sit amet. Lorem ipsum dolor sit amet, consetetur sadipscing elitr, sed diam nonumy eirmod tempor
invidunt ut labore et dolore magna aliquyam erat.
Consetetur sadipscing elitr, sed diam nonumy eirmod tempor invidunt ut labore et dolore magna aliquyam erat, sed diam
voluptua. At vero eos et accusam et justo duo dolores et ea rebum. Stet clita kasd gubergren, no sea takimata sanctus
est Lorem ipsum dolor sit amet. Lorem ipsum dolor sit amet, consetetur sadipscing elitr, sed diam nonumy e""".replace("\r\n", "\n")
implicit val system = ActorSystem(getClass.getSimpleName)
implicit val materializer = FlowMaterializer()
override def afterAll() = {
system.shutdown()
system.awaitTermination(10.seconds)
}
}

View file

@ -0,0 +1,39 @@
/*
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.http.coding
import akka.util.ByteString
import org.scalatest.WordSpec
import akka.http.model._
import headers._
import HttpMethods.POST
class DecoderSpec extends WordSpec with CodecSpecSupport {
"A Decoder" should {
"not transform the message if it doesn't contain a Content-Encoding header" in {
val request = HttpRequest(POST, entity = HttpEntity(smallText))
DummyDecoder.decode(request) === request
}
"correctly transform the message if it contains a Content-Encoding header" in {
val request = HttpRequest(POST, entity = HttpEntity(smallText), headers = List(`Content-Encoding`(DummyDecoder.encoding)))
val decoded = DummyDecoder.decode(request)
decoded.headers === Nil
decoded.entity === HttpEntity(dummyDecompress(smallText))
}
}
def dummyDecompress(s: String): String = dummyDecompress(ByteString(s, "UTF8")).decodeString("UTF8")
def dummyDecompress(bytes: ByteString): ByteString = DummyDecompressor.decompress(bytes)
case object DummyDecoder extends Decoder {
val encoding = HttpEncodings.compress
def newDecompressor = DummyDecompressor
}
case object DummyDecompressor extends Decompressor {
def decompress(buffer: ByteString): ByteString = buffer ++ ByteString("compressed")
}
}

View file

@ -0,0 +1,82 @@
/*
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.http.coding
import akka.util.ByteString
import akka.http.util._
import org.scalatest.WordSpec
import akka.http.model._
import HttpMethods.POST
import java.io.ByteArrayOutputStream
import java.util.zip.{ DeflaterOutputStream, InflaterOutputStream }
class DeflateSpec extends WordSpec with CodecSpecSupport {
"The Deflate codec" should {
"properly encode a small string" in {
streamInflate(ourDeflate(smallTextBytes)) should readAs(smallText)
}
"properly decode a small string" in {
ourInflate(streamDeflate(smallTextBytes)) should readAs(smallText)
}
"properly round-trip encode/decode a small string" in {
ourInflate(ourDeflate(smallTextBytes)) should readAs(smallText)
}
"properly encode a large string" in {
streamInflate(ourDeflate(largeTextBytes)) should readAs(largeText)
}
"properly decode a large string" in {
ourInflate(streamDeflate(largeTextBytes)) should readAs(largeText)
}
"properly round-trip encode/decode a large string" in {
ourInflate(ourDeflate(largeTextBytes)) should readAs(largeText)
}
"properly round-trip encode/decode an HttpRequest" in {
val request = HttpRequest(POST, entity = HttpEntity(largeText))
Deflate.decode(Deflate.encode(request)) should equal(request)
}
"provide a better compression ratio than the standard Deflater/Inflater streams" in {
ourDeflate(largeTextBytes).length should be < streamDeflate(largeTextBytes).length
}
"support chunked round-trip encoding/decoding" in {
val chunks = largeTextBytes.grouped(512).toVector
val comp = Deflate.newCompressor
val decomp = Deflate.newDecompressor
val chunks2 =
chunks.map { chunk
decomp.decompress(comp.compressAndFlush(chunk))
} :+
decomp.decompress(comp.finish())
chunks2.join should readAs(largeText)
}
"works for any split in prefix + suffix" in {
val compressed = streamDeflate(smallTextBytes)
def tryWithPrefixOfSize(prefixSize: Int): Unit = {
val decomp = Deflate.newDecompressor
val prefix = compressed.take(prefixSize)
val suffix = compressed.drop(prefixSize)
decomp.decompress(prefix) ++ decomp.decompress(suffix) should readAs(smallText)
}
(0 to compressed.size).foreach(tryWithPrefixOfSize)
}
}
def ourDeflate(bytes: ByteString): ByteString = Deflate.newCompressor.compressAndFinish(bytes)
def ourInflate(bytes: ByteString): ByteString = Deflate.newDecompressor.decompress(bytes)
def streamDeflate(bytes: ByteString) = {
val output = new ByteArrayOutputStream()
val dos = new DeflaterOutputStream(output); dos.write(bytes.toArray); dos.close()
ByteString(output.toByteArray)
}
def streamInflate(bytes: ByteString) = {
val output = new ByteArrayOutputStream()
val ios = new InflaterOutputStream(output); ios.write(bytes.toArray); ios.close()
ByteString(output.toByteArray)
}
}

View file

@ -0,0 +1,45 @@
/*
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.http.coding
import akka.util.ByteString
import org.scalatest.WordSpec
import akka.http.model._
import headers._
import HttpMethods.POST
class EncoderSpec extends WordSpec with CodecSpecSupport {
"An Encoder" should {
"not transform the message if messageFilter returns false" in {
val request = HttpRequest(POST, entity = HttpEntity(smallText.getBytes("UTF8")))
DummyEncoder.encode(request) === request
}
"correctly transform the HttpMessage if messageFilter returns true" in {
val request = HttpRequest(POST, entity = HttpEntity(smallText))
val encoded = DummyEncoder.encode(request)
encoded.headers === List(`Content-Encoding`(DummyEncoder.encoding))
encoded.entity === HttpEntity(dummyCompress(smallText))
}
}
def dummyCompress(s: String): String = dummyCompress(ByteString(s, "UTF8")).utf8String
def dummyCompress(bytes: ByteString): ByteString = DummyCompressor.compressAndFinish(bytes)
case object DummyEncoder extends Encoder {
val messageFilter = Encoder.DefaultFilter
val encoding = HttpEncodings.compress
def newCompressor = DummyCompressor
}
case object DummyCompressor extends Compressor {
def compress(input: ByteString) = input ++ ByteString("compressed")
def flush() = ByteString.empty
def finish() = ByteString.empty
def compressAndFlush(input: ByteString): ByteString = compress(input)
def compressAndFinish(input: ByteString): ByteString = compress(input)
}
}

File diff suppressed because one or more lines are too long

View file

@ -0,0 +1,35 @@
/*
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.http.coding
import akka.http.model.{ HttpRequest, HttpResponse, ResponseEntity, RequestEntity }
import akka.stream.{ Transformer, FlowMaterializer }
import akka.util.ByteString
/** An abstraction to transform data bytes of HttpMessages or HttpEntities */
sealed trait DataMapper[T] {
def transformDataBytes(t: T, transformer: () Transformer[ByteString, ByteString])(implicit materializer: FlowMaterializer): T
}
object DataMapper {
implicit val mapRequestEntity: DataMapper[RequestEntity] =
new DataMapper[RequestEntity] {
def transformDataBytes(t: RequestEntity, transformer: () Transformer[ByteString, ByteString])(implicit materializer: FlowMaterializer): RequestEntity =
t.transformDataBytes(transformer)
}
implicit val mapResponseEntity: DataMapper[ResponseEntity] =
new DataMapper[ResponseEntity] {
def transformDataBytes(t: ResponseEntity, transformer: () Transformer[ByteString, ByteString])(implicit materializer: FlowMaterializer): ResponseEntity =
t.transformDataBytes(transformer)
}
implicit val mapRequest: DataMapper[HttpRequest] = mapMessage(mapRequestEntity)((m, f) m.withEntity(f(m.entity)))
implicit val mapResponse: DataMapper[HttpResponse] = mapMessage(mapResponseEntity)((m, f) m.withEntity(f(m.entity)))
def mapMessage[T, E](entityMapper: DataMapper[E])(mapEntity: (T, E E) T): DataMapper[T] =
new DataMapper[T] {
def transformDataBytes(t: T, transformer: () Transformer[ByteString, ByteString])(implicit materializer: FlowMaterializer): T =
mapEntity(t, entityMapper.transformDataBytes(_, transformer))
}
}

View file

@ -0,0 +1,42 @@
/*
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.http.coding
import java.io.{ OutputStream, ByteArrayOutputStream }
import akka.http.model._
import akka.http.util.StreamUtils
import akka.stream.{ Transformer, FlowMaterializer }
import akka.util.ByteString
import headers.HttpEncoding
trait Decoder {
def encoding: HttpEncoding
def decode[T <: HttpMessage](message: T)(implicit mapper: DataMapper[T], materializer: FlowMaterializer): T#Self =
if (message.headers exists Encoder.isContentEncodingHeader)
decodeData(message).withHeaders(message.headers filterNot Encoder.isContentEncodingHeader)
else message.self
def decodeData[T](t: T)(implicit mapper: DataMapper[T], materializer: FlowMaterializer): T =
mapper.transformDataBytes(t, newDecodeTransfomer)
def newDecompressor: Decompressor
def newDecodeTransfomer(): Transformer[ByteString, ByteString] = {
val decompressor = newDecompressor
def decodeChunk(bytes: ByteString): ByteString = decompressor.decompress(bytes)
def finish(): ByteString = ByteString.empty
StreamUtils.byteStringTransformer(decodeChunk, finish)
}
}
/** A stateful object representing ongoing decompression. */
abstract class Decompressor {
/** Decompress the buffer and return decompressed data. */
def decompress(buffer: ByteString): ByteString
}

View file

@ -0,0 +1,109 @@
/*
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.http.coding
import java.io.OutputStream
import java.util.zip.{ DataFormatException, ZipException, Inflater, Deflater }
import akka.util.{ ByteStringBuilder, ByteString }
import scala.annotation.tailrec
import akka.http.util._
import akka.http.model._
import headers.HttpEncodings
class Deflate(val messageFilter: HttpMessage Boolean) extends Decoder with Encoder {
val encoding = HttpEncodings.deflate
def newCompressor = new DeflateCompressor
def newDecompressor = new DeflateDecompressor
}
/**
* An encoder and decoder for the HTTP 'deflate' encoding.
*/
object Deflate extends Deflate(Encoder.DefaultFilter) {
def apply(messageFilter: HttpMessage Boolean) = new Deflate(messageFilter)
}
class DeflateCompressor extends Compressor {
protected lazy val deflater = new Deflater(Deflater.BEST_COMPRESSION, false)
override final def compressAndFlush(input: ByteString): ByteString = {
val buffer = newTempBuffer(input.size)
compressWithBuffer(input, buffer) ++ flushWithBuffer(buffer)
}
override final def compressAndFinish(input: ByteString): ByteString = {
val buffer = newTempBuffer(input.size)
compressWithBuffer(input, buffer) ++ finishWithBuffer(buffer)
}
override final def compress(input: ByteString): ByteString = compressWithBuffer(input, newTempBuffer())
override final def flush(): ByteString = flushWithBuffer(newTempBuffer())
override final def finish(): ByteString = finishWithBuffer(newTempBuffer())
protected def compressWithBuffer(input: ByteString, buffer: Array[Byte]): ByteString = {
assert(deflater.needsInput())
deflater.setInput(input.toArray)
drain(buffer)
}
protected def flushWithBuffer(buffer: Array[Byte]): ByteString = {
// trick the deflater into flushing: switch compression level
// FIXME: use proper APIs and SYNC_FLUSH when Java 6 support is dropped
deflater.deflate(EmptyByteArray, 0, 0)
deflater.setLevel(Deflater.NO_COMPRESSION)
val res1 = drain(buffer)
deflater.setLevel(Deflater.BEST_COMPRESSION)
val res2 = drain(buffer)
res1 ++ res2
}
protected def finishWithBuffer(buffer: Array[Byte]): ByteString = {
deflater.finish()
val res = drain(buffer)
deflater.end()
res
}
@tailrec
protected final def drain(buffer: Array[Byte], result: ByteStringBuilder = new ByteStringBuilder()): ByteString = {
val len = deflater.deflate(buffer)
if (len > 0) {
result ++= ByteString.fromArray(buffer, 0, len)
drain(buffer, result)
} else {
assert(deflater.needsInput())
result.result()
}
}
private def newTempBuffer(size: Int = 65536): Array[Byte] =
// The default size is somewhat arbitrary, we'd like to guess a better value but Deflater/zlib
// is buffering in an unpredictable manner.
// `compress` will only return any data if the buffered compressed data has some size in
// the region of 10000-50000 bytes.
// `flush` and `finish` will return any size depending on the previous input.
// This value will hopefully provide a good compromise between memory churn and
// excessive fragmentation of ByteStrings.
new Array[Byte](size)
}
class DeflateDecompressor extends Decompressor {
protected lazy val inflater = new Inflater()
def decompress(buffer: ByteString): ByteString =
try {
inflater.setInput(buffer.toArray)
drain(new Array[Byte](buffer.length * 2))
} catch {
case e: DataFormatException
throw new ZipException(e.getMessage.toOption getOrElse "Invalid ZLIB data format")
}
@tailrec protected final def drain(buffer: Array[Byte], result: ByteString = ByteString.empty): ByteString = {
val len = inflater.inflate(buffer)
if (len > 0) drain(buffer, result ++ ByteString.fromArray(buffer, 0, len))
else if (inflater.needsDictionary) throw new ZipException("ZLIB dictionary missing")
else result
}
}

View file

@ -0,0 +1,75 @@
/*
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.http.coding
import java.io.ByteArrayOutputStream
import akka.http.model._
import akka.http.util.StreamUtils
import akka.stream.{ Transformer, FlowMaterializer }
import akka.util.ByteString
import headers._
trait Encoder {
def encoding: HttpEncoding
def messageFilter: HttpMessage Boolean
def encode[T <: HttpMessage](message: T)(implicit mapper: DataMapper[T], materializer: FlowMaterializer): T#Self =
if (messageFilter(message) && !message.headers.exists(Encoder.isContentEncodingHeader))
encodeData(message).withHeaders(`Content-Encoding`(encoding) +: message.headers)
else message.self
def encodeData[T](t: T)(implicit mapper: DataMapper[T], materializer: FlowMaterializer): T =
mapper.transformDataBytes(t, newEncodeTransformer)
def newCompressor: Compressor
def newEncodeTransformer(): Transformer[ByteString, ByteString] = {
val compressor = newCompressor
def encodeChunk(bytes: ByteString): ByteString = compressor.compressAndFlush(bytes)
def finish(): ByteString = compressor.finish()
StreamUtils.byteStringTransformer(encodeChunk, finish)
}
}
object Encoder {
val DefaultFilter: HttpMessage Boolean = {
case req: HttpRequest isCompressible(req)
case res @ HttpResponse(status, _, _, _) isCompressible(res) && status.isSuccess
}
private[coding] def isCompressible(msg: HttpMessage): Boolean =
msg.entity.contentType.mediaType.compressible
private[coding] val isContentEncodingHeader: HttpHeader Boolean = _.isInstanceOf[`Content-Encoding`]
}
/** A stateful object representing ongoing compression. */
abstract class Compressor {
/**
* Compresses the given input and returns compressed data. The implementation
* can and will choose to buffer output data to improve compression. Use
* `flush` or `compressAndFlush` to make sure that all input data has been
* compressed and pending output data has been returned.
*/
def compress(input: ByteString): ByteString
/**
* Flushes any output data and returns the currently remaining compressed data.
*/
def flush(): ByteString
/**
* Closes this compressed stream and return the remaining compressed data. After
* calling this method, this Compressor cannot be used any further.
*/
def finish(): ByteString
/** Combines `compress` + `flush` */
def compressAndFlush(input: ByteString): ByteString
/** Combines `compress` + `finish` */
def compressAndFinish(input: ByteString): ByteString
}

View file

@ -0,0 +1,209 @@
/*
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.http.coding
import java.io.OutputStream
import java.util.zip.{ Inflater, CRC32, ZipException, Deflater }
import akka.util.ByteString
import scala.annotation.tailrec
import akka.http.model._
import headers.HttpEncodings
import scala.util.control.NoStackTrace
class Gzip(val messageFilter: HttpMessage Boolean) extends Decoder with Encoder {
val encoding = HttpEncodings.gzip
def newCompressor = new GzipCompressor
def newDecompressor = new GzipDecompressor
}
/**
* An encoder and decoder for the HTTP 'gzip' encoding.
*/
object Gzip extends Gzip(Encoder.DefaultFilter) {
def apply(messageFilter: HttpMessage Boolean) = new Gzip(messageFilter)
}
class GzipCompressor extends DeflateCompressor {
override protected lazy val deflater = new Deflater(Deflater.BEST_COMPRESSION, true)
private val checkSum = new CRC32 // CRC32 of uncompressed data
private var headerSent = false
private var bytesRead = 0L
override protected def compressWithBuffer(input: ByteString, buffer: Array[Byte]): ByteString = {
updateCrc(input)
header() ++ super.compressWithBuffer(input, buffer)
}
override protected def flushWithBuffer(buffer: Array[Byte]): ByteString = header() ++ super.flushWithBuffer(buffer)
override protected def finishWithBuffer(buffer: Array[Byte]): ByteString = super.finishWithBuffer(buffer) ++ trailer()
private def updateCrc(input: ByteString): Unit = {
checkSum.update(input.toArray)
bytesRead += input.length
}
private def header(): ByteString =
if (!headerSent) {
headerSent = true
GzipDecompressor.Header
} else ByteString.empty
private def trailer(): ByteString = {
def int32(i: Int): ByteString = ByteString(i, i >> 8, i >> 16, i >> 24)
val crc = checkSum.getValue.toInt
val tot = bytesRead.toInt // truncated to 32bit as specified in https://tools.ietf.org/html/rfc1952#section-2
val trailer = int32(crc) ++ int32(tot)
trailer
}
}
/** A suspendable gzip decompressor */
class GzipDecompressor extends DeflateDecompressor {
override protected lazy val inflater = new Inflater(true) // disable ZLIB headers
override def decompress(input: ByteString): ByteString = DecompressionStateMachine.run(input)
import GzipDecompressor._
object DecompressionStateMachine extends StateMachine {
def initialState = readHeaders
private def readHeaders(data: ByteString): Action =
// header has at least size 3
if (data.size < 4) SuspendAndRetryWithMoreData
else try {
val reader = new ByteReader(data)
import reader._
if (readByte() != 0x1F || readByte() != 0x8B) fail("Not in GZIP format") // check magic header
if (readByte() != 8) fail("Unsupported GZIP compression method") // check compression method
val flags = readByte()
skip(6) // skip MTIME, XFL and OS fields
if ((flags & 4) > 0) skip(readShort()) // skip optional extra fields
if ((flags & 8) > 0) while (readByte() != 0) {} // skip optional file name
if ((flags & 16) > 0) while (readByte() != 0) {} // skip optional file comment
if ((flags & 2) > 0 && crc16(data.take(currentOffset)) != readShort()) fail("Corrupt GZIP header")
ContinueWith(deflate(new CRC32), remainingData)
} catch {
case ByteReader.NeedMoreData SuspendAndRetryWithMoreData
}
private def deflate(checkSum: CRC32)(data: ByteString): Action = {
assert(inflater.needsInput())
inflater.setInput(data.toArray)
val output = drain(new Array[Byte](data.length * 2))
checkSum.update(output.toArray)
if (inflater.finished()) EmitAndContinueWith(output, readTrailer(checkSum), data.takeRight(inflater.getRemaining))
else EmitAndSuspend(output)
}
private def readTrailer(checkSum: CRC32)(data: ByteString): Action =
try {
val reader = new ByteReader(data)
import reader._
if (readInt() != checkSum.getValue.toInt) fail("Corrupt data (CRC32 checksum error)")
if (readInt() != inflater.getBytesWritten.toInt /* truncated to 32bit */ ) fail("Corrupt GZIP trailer ISIZE")
inflater.reset()
checkSum.reset()
ContinueWith(initialState, remainingData) // start over to support multiple concatenated gzip streams
} catch {
case ByteReader.NeedMoreData SuspendAndRetryWithMoreData
}
private def fail(msg: String) = Fail(new ZipException(msg))
private def crc16(data: ByteString) = {
val crc = new CRC32
crc.update(data.toArray)
crc.getValue.toInt & 0xFFFF
}
}
}
/** INTERNAL API */
private[http] object GzipDecompressor {
// RFC 1952: http://tools.ietf.org/html/rfc1952 section 2.2
val Header = ByteString(
31, // ID1
-117, // ID2
8, // CM = Deflate
0, // FLG
0, // MTIME 1
0, // MTIME 2
0, // MTIME 3
0, // MTIME 4
0, // XFL
0 // OS
)
class ByteReader(input: ByteString) {
import ByteReader.NeedMoreData
private[this] var off = 0
def readByte(): Int =
if (off < input.length) {
val x = input(off)
off += 1
x.toInt & 0xFF
} else throw NeedMoreData
def readShort(): Int = readByte() | (readByte() << 8)
def readInt(): Int = readShort() | (readShort() << 16)
def skip(numBytes: Int): Unit =
if (off + numBytes <= input.length) off += numBytes
else throw NeedMoreData
def currentOffset: Int = off
def remainingData: ByteString = input.drop(off)
}
object ByteReader {
val NeedMoreData = new Exception with NoStackTrace
}
/** A simple state machine implementation for suspendable parsing */
trait StateMachine {
sealed trait Action
/** Cache the current input and suspend to wait for more data */
case object SuspendAndRetryWithMoreData extends Action
/** Emit some output and suspend in the current state and wait for more data */
case class EmitAndSuspend(output: ByteString) extends Action
/** Proceed to the nextState and immediately run it with the remainingInput */
case class ContinueWith(nextState: State, remainingInput: ByteString) extends Action
/** Emit some output and then proceed to the nextState and immediately run it with the remainingInput */
case class EmitAndContinueWith(output: ByteString, nextState: State, remainingInput: ByteString) extends Action
/** Fail with the given exception and go into the failed state which will throw for any new data */
case class Fail(cause: Throwable) extends Action
type State = ByteString Action
def initialState: State
private[this] var state: State = initialState
/** Run the state machine with the current input */
@tailrec final def run(input: ByteString, result: ByteString = ByteString.empty): ByteString =
state(input) match {
case SuspendAndRetryWithMoreData
val oldState = state
state = { newData
state = oldState
oldState(input ++ newData)
}
result
case EmitAndSuspend(output) result ++ output
case ContinueWith(next, remainingInput)
state = next
run(remainingInput, result)
case EmitAndContinueWith(output, next, remainingInput)
state = next
run(remainingInput, result ++ output)
case Fail(cause)
state = failState
throw cause
}
private def failState: State = _ throw new IllegalStateException("Trying to reuse failed decompressor.")
}
}

View file

@ -0,0 +1,39 @@
/*
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.http.coding
import akka.http.model._
import akka.stream.FlowMaterializer
import akka.util.ByteString
import headers.HttpEncodings
/**
* An encoder and decoder for the HTTP 'identity' encoding.
*/
object NoEncoding extends Decoder with Encoder {
val encoding = HttpEncodings.identity
override def encode[T <: HttpMessage](message: T)(implicit mapper: DataMapper[T], materializer: FlowMaterializer): T#Self = message.self
override def encodeData[T](t: T)(implicit mapper: DataMapper[T], materializer: FlowMaterializer): T = t
override def decode[T <: HttpMessage](message: T)(implicit mapper: DataMapper[T], materializer: FlowMaterializer): T#Self = message.self
override def decodeData[T](t: T)(implicit mapper: DataMapper[T], materializer: FlowMaterializer): T = t
val messageFilter: HttpMessage Boolean = _ false
def newCompressor = NoEncodingCompressor
def newDecompressor = NoEncodingDecompressor
}
object NoEncodingCompressor extends Compressor {
def compress(input: ByteString): ByteString = input
def flush() = ByteString.empty
def finish() = ByteString.empty
def compressAndFlush(input: ByteString): ByteString = input
def compressAndFinish(input: ByteString): ByteString = input
}
object NoEncodingDecompressor extends Decompressor {
def decompress(input: ByteString): ByteString = input
}