diff --git a/akka-http-core/src/main/scala/akka/http/engine/server/HttpServer.scala b/akka-http-core/src/main/scala/akka/http/engine/server/HttpServer.scala index 785fdd68e5..7b125f196d 100644 --- a/akka-http-core/src/main/scala/akka/http/engine/server/HttpServer.scala +++ b/akka-http-core/src/main/scala/akka/http/engine/server/HttpServer.scala @@ -70,9 +70,9 @@ private[http] object HttpServer { case (_, src) ⇒ src.runWith(BlackholeSink) }.collect { case r: HttpRequest ⇒ r - }.buffer(1, OverflowStrategy.backpressure) - // FIXME #16583 it is unclear why this is needed, some element probably does not propagate demand eagerly enough - // the failing test would be HttpServerSpec + }.buffer(1, OverflowStrategy.backpressure) + // FIXME #16583 it is unclear why this is needed, some element probably does not propagate demand eagerly enough + // the failing test would be HttpServerSpec // we need to make sure that only one element per incoming request is queueing up in front of // the bypassMerge.bypassInput. Otherwise the rising backpressure against the bypassFanout diff --git a/akka-http-tests/src/test/scala/akka/http/coding/CoderSpec.scala b/akka-http-tests/src/test/scala/akka/http/coding/CoderSpec.scala index 082036308d..7429e539f6 100644 --- a/akka-http-tests/src/test/scala/akka/http/coding/CoderSpec.scala +++ b/akka-http-tests/src/test/scala/akka/http/coding/CoderSpec.scala @@ -17,7 +17,7 @@ import akka.http.util._ import akka.http.model.HttpMethods._ import akka.http.model.{ HttpEntity, HttpRequest } -import akka.stream.scaladsl.Source +import akka.stream.scaladsl.{ Sink, Source } import akka.util.ByteString import scala.util.control.NoStackTrace @@ -104,9 +104,9 @@ abstract class CoderSpec extends WordSpec with CodecSpecSupport with Inspectors val compressed = streamEncode(ByteString(array)) val limit = 10000 val resultBs = - Source.singleton(compressed) + Source.single(compressed) .via(Coder.withMaxBytesPerChunk(limit).decoderFlow) - .collectAll + .grouped(4200).runWith(Sink.head) .awaitResult(1.second) forAll(resultBs) { bs ⇒ diff --git a/akka-http/src/main/resources/reference.conf b/akka-http/src/main/resources/reference.conf index 7c8a4d5f36..5ce63ffed3 100644 --- a/akka-http/src/main/resources/reference.conf +++ b/akka-http/src/main/resources/reference.conf @@ -32,4 +32,8 @@ akka.http.routing { # The maximum number of allowed ranges per request. # Requests with more ranges will be rejected due to DOS suspicion. range-count-limit = 16 + + # The maximum number of bytes per ByteString a decoding directive will produce + # for an entity data stream. + decode-max-bytes-per-chunk = 1m } diff --git a/akka-http/src/main/scala/akka/http/server/RoutingSettings.scala b/akka-http/src/main/scala/akka/http/server/RoutingSettings.scala index bf500898ff..b01db533fd 100644 --- a/akka-http/src/main/scala/akka/http/server/RoutingSettings.scala +++ b/akka-http/src/main/scala/akka/http/server/RoutingSettings.scala @@ -13,7 +13,8 @@ case class RoutingSettings( fileGetConditional: Boolean, renderVanityFooter: Boolean, rangeCountLimit: Int, - rangeCoalescingThreshold: Long) + rangeCoalescingThreshold: Long, + decodeMaxBytesPerChunk: Int) object RoutingSettings extends SettingsCompanion[RoutingSettings]("akka.http.routing") { def fromSubConfig(c: Config) = apply( @@ -21,7 +22,8 @@ object RoutingSettings extends SettingsCompanion[RoutingSettings]("akka.http.rou c getBoolean "file-get-conditional", c getBoolean "render-vanity-footer", c getInt "range-count-limit", - c getBytes "range-coalescing-threshold") + c getBytes "range-coalescing-threshold", + c getIntBytes "decode-max-bytes-per-chunk") implicit def default(implicit refFactory: ActorRefFactory) = apply(actorSystem) diff --git a/akka-http/src/main/scala/akka/http/server/directives/CodingDirectives.scala b/akka-http/src/main/scala/akka/http/server/directives/CodingDirectives.scala index ccd1d44cab..e4494daea8 100644 --- a/akka-http/src/main/scala/akka/http/server/directives/CodingDirectives.scala +++ b/akka-http/src/main/scala/akka/http/server/directives/CodingDirectives.scala @@ -57,12 +57,13 @@ trait CodingDirectives { */ def decodeRequest(decoder: Decoder): Directive0 = { def applyDecoder = - mapRequest(decoder.decode(_).mapEntity(StreamUtils.mapEntityError { - case NonFatal(e) ⇒ - IllegalRequestException( - StatusCodes.BadRequest, - ErrorInfo("The request's encoding is corrupt", e.getMessage)) - })) + extractSettings.flatMap(settings ⇒ + mapRequest(decoder.withMaxBytesPerChunk(settings.decodeMaxBytesPerChunk).decode(_).mapEntity(StreamUtils.mapEntityError { + case NonFatal(e) ⇒ + IllegalRequestException( + StatusCodes.BadRequest, + ErrorInfo("The request's encoding is corrupt", e.getMessage)) + }))) requestEntityEmpty | ( requestEncodedWith(decoder.encoding) &