From 45a2440970aaf87f7f856d8e28c3bef77dd0cb32 Mon Sep 17 00:00:00 2001 From: Evgeny Vanslov Date: Mon, 29 Feb 2016 15:40:14 +0300 Subject: [PATCH] Replaced PushStage based CheckContentLength with GraphStage #19834 --- .../impl/engine/rendering/RenderSupport.scala | 50 +++++++++++++------ project/MiMa.scala | 9 +--- 2 files changed, 36 insertions(+), 23 deletions(-) diff --git a/akka-http-core/src/main/scala/akka/http/impl/engine/rendering/RenderSupport.scala b/akka-http-core/src/main/scala/akka/http/impl/engine/rendering/RenderSupport.scala index 66c983f498..6da97f2f42 100644 --- a/akka-http-core/src/main/scala/akka/http/impl/engine/rendering/RenderSupport.scala +++ b/akka-http-core/src/main/scala/akka/http/impl/engine/rendering/RenderSupport.scala @@ -5,9 +5,10 @@ package akka.http.impl.engine.rendering import akka.parboiled2.CharUtils -import akka.stream.SourceShape +import akka.stream.{Attributes, SourceShape} import akka.util.ByteString import akka.event.LoggingAdapter +import akka.stream.impl.fusing.GraphStages.SimpleLinearGraphStage import akka.stream.scaladsl._ import akka.stream.stage._ import akka.http.scaladsl.model._ @@ -69,28 +70,45 @@ private object RenderSupport { } object CheckContentLengthTransformer { - def flow(contentLength: Long) = Flow[ByteString].transform(() ⇒ - new CheckContentLengthTransformer(contentLength)).named("checkContentLength") + def flow(contentLength: Long) = Flow[ByteString].via(new CheckContentLengthTransformer(contentLength)) } - class CheckContentLengthTransformer(length: Long) extends PushStage[ByteString, ByteString] { - var sent = 0L + final class CheckContentLengthTransformer(length: Long) extends SimpleLinearGraphStage[ByteString] { + override def initialAttributes: Attributes = Attributes.name("CheckContentLength") - override def onPush(elem: ByteString, ctx: Context[ByteString]): SyncDirective = { - sent += elem.length - if (sent > length) - ctx fail InvalidContentLengthException(s"HTTP message had declared Content-Length $length but entity data stream amounts to more bytes") - ctx.push(elem) - } + override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = + new GraphStageLogic(shape) with InHandler with OutHandler { + override def toString = s"CheckContentLength(sent=$sent)" - override def onUpstreamFinish(ctx: Context[ByteString]): TerminationDirective = { - if (sent < length) - ctx fail InvalidContentLengthException(s"HTTP message had declared Content-Length $length but entity data stream amounts to ${length - sent} bytes less") - ctx.finish() - } + private var sent = 0L + override def onPush(): Unit = { + val elem = grab(in) + sent += elem.length + if (sent <= length) { + push(out, elem) + } else { + failStage(InvalidContentLengthException(s"HTTP message had declared Content-Length $length but entity data stream amounts to more bytes")) + } + } + + override def onUpstreamFinish(): Unit = { + if (sent < length) { + failStage(InvalidContentLengthException(s"HTTP message had declared Content-Length $length but entity data stream amounts to ${length - sent} bytes less")) + } else { + completeStage() + } + } + + override def onPull(): Unit = pull(in) + + setHandlers(in, out, this) + } + + override def toString = "CheckContentLength" } + private def renderChunk(chunk: HttpEntity.ChunkStreamPart): ByteString = { import chunk._ val renderedSize = // buffer space required for rendering (without trailer) diff --git a/project/MiMa.scala b/project/MiMa.scala index aececeaed8..587ed4ce0b 100644 --- a/project/MiMa.scala +++ b/project/MiMa.scala @@ -673,13 +673,8 @@ object MiMa extends AutoPlugin { ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.http.scaladsl.model.headers.CacheDirectives#private.apply"), ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.http.scaladsl.model.headers.CacheDirectives#no-cache.apply"), - ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.http.impl.model.parser.SimpleHeaders.strict-transport-security"), - ProblemFilters.exclude[DirectMissingMethodProblem]("akka.http.impl.engine.rendering.RequestRenderingContext.copy"), - ProblemFilters.exclude[DirectMissingMethodProblem]("akka.http.impl.engine.rendering.RequestRenderingContext.this"), - ProblemFilters.exclude[MissingTypesProblem]("akka.http.impl.engine.rendering.RequestRenderingContext$"), - ProblemFilters.exclude[DirectMissingMethodProblem]("akka.http.impl.engine.rendering.RequestRenderingContext.apply"), - ProblemFilters.exclude[DirectMissingMethodProblem]("akka.http.impl.engine.parsing.HttpResponseParser.setRequestMethodForNextResponse"), - ProblemFilters.exclude[DirectMissingMethodProblem]("akka.http.impl.util.package.errorHandling"), + // #19913 internal and shouldn't be public + FilterAnyProblemStartingWith("akka.http.impl"), // #19983 withoutSizeLimit overrides for Scala API ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.http.scaladsl.model.RequestEntity.withoutSizeLimit"),