+htp #16516 allow setting maximum ByteString chunk size for decoding operations

This commit is contained in:
Johannes Rudolph 2014-12-12 10:27:01 +01:00
parent 29d7a041f6
commit fd8e0225eb
5 changed files with 21 additions and 14 deletions

View file

@ -70,9 +70,9 @@ private[http] object HttpServer {
case (_, src) src.runWith(BlackholeSink)
}.collect {
case r: HttpRequest r
}.buffer(1, OverflowStrategy.backpressure)
// FIXME #16583 it is unclear why this is needed, some element probably does not propagate demand eagerly enough
// the failing test would be HttpServerSpec
}.buffer(1, OverflowStrategy.backpressure)
// FIXME #16583 it is unclear why this is needed, some element probably does not propagate demand eagerly enough
// the failing test would be HttpServerSpec
// we need to make sure that only one element per incoming request is queueing up in front of
// the bypassMerge.bypassInput. Otherwise the rising backpressure against the bypassFanout

View file

@ -17,7 +17,7 @@ import akka.http.util._
import akka.http.model.HttpMethods._
import akka.http.model.{ HttpEntity, HttpRequest }
import akka.stream.scaladsl.Source
import akka.stream.scaladsl.{ Sink, Source }
import akka.util.ByteString
import scala.util.control.NoStackTrace
@ -104,9 +104,9 @@ abstract class CoderSpec extends WordSpec with CodecSpecSupport with Inspectors
val compressed = streamEncode(ByteString(array))
val limit = 10000
val resultBs =
Source.singleton(compressed)
Source.single(compressed)
.via(Coder.withMaxBytesPerChunk(limit).decoderFlow)
.collectAll
.grouped(4200).runWith(Sink.head)
.awaitResult(1.second)
forAll(resultBs) { bs

View file

@ -32,4 +32,8 @@ akka.http.routing {
# The maximum number of allowed ranges per request.
# Requests with more ranges will be rejected due to DOS suspicion.
range-count-limit = 16
# The maximum number of bytes per ByteString a decoding directive will produce
# for an entity data stream.
decode-max-bytes-per-chunk = 1m
}

View file

@ -13,7 +13,8 @@ case class RoutingSettings(
fileGetConditional: Boolean,
renderVanityFooter: Boolean,
rangeCountLimit: Int,
rangeCoalescingThreshold: Long)
rangeCoalescingThreshold: Long,
decodeMaxBytesPerChunk: Int)
object RoutingSettings extends SettingsCompanion[RoutingSettings]("akka.http.routing") {
def fromSubConfig(c: Config) = apply(
@ -21,7 +22,8 @@ object RoutingSettings extends SettingsCompanion[RoutingSettings]("akka.http.rou
c getBoolean "file-get-conditional",
c getBoolean "render-vanity-footer",
c getInt "range-count-limit",
c getBytes "range-coalescing-threshold")
c getBytes "range-coalescing-threshold",
c getIntBytes "decode-max-bytes-per-chunk")
implicit def default(implicit refFactory: ActorRefFactory) =
apply(actorSystem)

View file

@ -57,12 +57,13 @@ trait CodingDirectives {
*/
def decodeRequest(decoder: Decoder): Directive0 = {
def applyDecoder =
mapRequest(decoder.decode(_).mapEntity(StreamUtils.mapEntityError {
case NonFatal(e)
IllegalRequestException(
StatusCodes.BadRequest,
ErrorInfo("The request's encoding is corrupt", e.getMessage))
}))
extractSettings.flatMap(settings
mapRequest(decoder.withMaxBytesPerChunk(settings.decodeMaxBytesPerChunk).decode(_).mapEntity(StreamUtils.mapEntityError {
case NonFatal(e)
IllegalRequestException(
StatusCodes.BadRequest,
ErrorInfo("The request's encoding is corrupt", e.getMessage))
})))
requestEntityEmpty | (
requestEncodedWith(decoder.encoding) &