=htc #19352 HttpServerBluePrint: get rid of costly flatMapConcat on the rendering-side

This commit is contained in:
Johannes Rudolph 2016-01-05 12:17:18 +01:00
parent ced5aa7ddc
commit 1f00b0c1c0
4 changed files with 195 additions and 153 deletions

View file

@ -6,6 +6,7 @@ package akka.http.impl.engine.rendering
import akka.http.impl.engine.ws.{ FrameEvent, UpgradeToWebsocketResponseHeader } import akka.http.impl.engine.ws.{ FrameEvent, UpgradeToWebsocketResponseHeader }
import akka.http.scaladsl.model.ws.Message import akka.http.scaladsl.model.ws.Message
import akka.stream.{ Outlet, Inlet, Attributes, FlowShape }
import scala.annotation.tailrec import scala.annotation.tailrec
import akka.event.LoggingAdapter import akka.event.LoggingAdapter
@ -52,170 +53,201 @@ private[http] class HttpResponseRendererFactory(serverHeader: Option[headers.Ser
// split out so we can stabilize by overriding in tests // split out so we can stabilize by overriding in tests
protected def currentTimeMillis(): Long = System.currentTimeMillis() protected def currentTimeMillis(): Long = System.currentTimeMillis()
def newRenderer: HttpResponseRenderer = new HttpResponseRenderer def renderer: Flow[ResponseRenderingContext, ResponseRenderingOutput, Unit] = Flow.fromGraph(HttpResponseRenderer)
final class HttpResponseRenderer extends PushStage[ResponseRenderingContext, Source[ResponseRenderingOutput, Any]] { object HttpResponseRenderer extends GraphStage[FlowShape[ResponseRenderingContext, ResponseRenderingOutput]] {
val in = Inlet[ResponseRenderingContext]("in")
val out = Outlet[ResponseRenderingOutput]("out")
val shape: FlowShape[ResponseRenderingContext, ResponseRenderingOutput] = FlowShape(in, out)
private[this] var closeMode: CloseMode = DontClose // signals what to do after the current response def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
private[this] def close: Boolean = closeMode != DontClose new GraphStageLogic(shape) {
private[this] def closeIf(cond: Boolean): Unit = private[this] var closeMode: CloseMode = DontClose // signals what to do after the current response
if (cond) closeMode = CloseConnection private[this] def close: Boolean = closeMode != DontClose
private[this] def closeIf(cond: Boolean): Unit =
if (cond) closeMode = CloseConnection
// need this for testing setHandler(in, new InHandler {
private[http] def isComplete = close def onPush(): Unit =
render(grab(in)) match {
case Strict(outElement)
push(out, outElement)
if (close) completeStage()
case Streamed(outStream) transfer(outStream)
}
override def onPush(ctx: ResponseRenderingContext, opCtx: Context[Source[ResponseRenderingOutput, Any]]): SyncDirective = { override def onUpstreamFinish(): Unit = closeMode = CloseConnection
val r = new ByteStringRendering(responseHeaderSizeHint) })
val waitForDemandHandler = new OutHandler {
import ctx.response._ def onPull(): Unit = if (close) completeStage() else pull(in)
val noEntity = entity.isKnownEmpty || ctx.requestMethod == HttpMethods.HEAD }
setHandler(out, waitForDemandHandler)
def renderStatusLine(): Unit = def transfer(outStream: Source[ResponseRenderingOutput, Any]): Unit = {
protocol match { val sinkIn = new SubSinkInlet[ResponseRenderingOutput]("RenderingSink")
case `HTTP/1.1` if (status eq StatusCodes.OK) r ~~ DefaultStatusLineBytes else r ~~ StatusLineStartBytes ~~ status ~~ CrLf sinkIn.setHandler(new InHandler {
case `HTTP/1.0` r ~~ protocol ~~ ' ' ~~ status ~~ CrLf def onPush(): Unit = push(out, sinkIn.grab())
override def onUpstreamFinish(): Unit = if (close) completeStage() else setHandler(out, waitForDemandHandler)
})
setHandler(out, new OutHandler {
def onPull(): Unit = sinkIn.pull()
})
sinkIn.pull()
Source.fromGraph(outStream).runWith(sinkIn.sink)(interpreter.subFusingMaterializer)
} }
def render(h: HttpHeader) = r ~~ h ~~ CrLf def render(ctx: ResponseRenderingContext): StrictOrStreamed = {
val r = new ByteStringRendering(responseHeaderSizeHint)
def mustRenderTransferEncodingChunkedHeader = import ctx.response._
entity.isChunked && (!entity.isKnownEmpty || ctx.requestMethod == HttpMethods.HEAD) && (ctx.requestProtocol == `HTTP/1.1`) val noEntity = entity.isKnownEmpty || ctx.requestMethod == HttpMethods.HEAD
@tailrec def renderHeaders(remaining: List[HttpHeader], alwaysClose: Boolean = false, def renderStatusLine(): Unit =
connHeader: Connection = null, serverSeen: Boolean = false, protocol match {
transferEncodingSeen: Boolean = false, dateSeen: Boolean = false): Unit = case `HTTP/1.1` if (status eq StatusCodes.OK) r ~~ DefaultStatusLineBytes else r ~~ StatusLineStartBytes ~~ status ~~ CrLf
remaining match { case `HTTP/1.0` r ~~ protocol ~~ ' ' ~~ status ~~ CrLf
case head :: tail head match { }
case x: `Content-Length`
suppressionWarning(log, x, "explicit `Content-Length` header is not allowed. Use the appropriate HttpEntity subtype.")
renderHeaders(tail, alwaysClose, connHeader, serverSeen, transferEncodingSeen, dateSeen)
case x: `Content-Type` def render(h: HttpHeader) = r ~~ h ~~ CrLf
suppressionWarning(log, x, "explicit `Content-Type` header is not allowed. Set `HttpResponse.entity.contentType` instead.")
renderHeaders(tail, alwaysClose, connHeader, serverSeen, transferEncodingSeen, dateSeen)
case x: Date def mustRenderTransferEncodingChunkedHeader =
render(x) entity.isChunked && (!entity.isKnownEmpty || ctx.requestMethod == HttpMethods.HEAD) && (ctx.requestProtocol == `HTTP/1.1`)
renderHeaders(tail, alwaysClose, connHeader, serverSeen, transferEncodingSeen, dateSeen = true)
case x: `Transfer-Encoding` @tailrec def renderHeaders(remaining: List[HttpHeader], alwaysClose: Boolean = false,
x.withChunkedPeeled match { connHeader: Connection = null, serverSeen: Boolean = false,
case None transferEncodingSeen: Boolean = false, dateSeen: Boolean = false): Unit =
suppressionWarning(log, head) remaining match {
case head :: tail head match {
case x: `Content-Length`
suppressionWarning(log, x, "explicit `Content-Length` header is not allowed. Use the appropriate HttpEntity subtype.")
renderHeaders(tail, alwaysClose, connHeader, serverSeen, transferEncodingSeen, dateSeen)
case x: `Content-Type`
suppressionWarning(log, x, "explicit `Content-Type` header is not allowed. Set `HttpResponse.entity.contentType` instead.")
renderHeaders(tail, alwaysClose, connHeader, serverSeen, transferEncodingSeen, dateSeen)
case x: Date
render(x)
renderHeaders(tail, alwaysClose, connHeader, serverSeen, transferEncodingSeen, dateSeen = true)
case x: `Transfer-Encoding`
x.withChunkedPeeled match {
case None
suppressionWarning(log, head)
renderHeaders(tail, alwaysClose, connHeader, serverSeen, transferEncodingSeen, dateSeen)
case Some(te)
// if the user applied some custom transfer-encoding we need to keep the header
render(if (mustRenderTransferEncodingChunkedHeader) te.withChunked else te)
renderHeaders(tail, alwaysClose, connHeader, serverSeen, transferEncodingSeen = true, dateSeen)
}
case x: Connection
val connectionHeader = if (connHeader eq null) x else Connection(x.tokens ++ connHeader.tokens)
renderHeaders(tail, alwaysClose, connectionHeader, serverSeen, transferEncodingSeen, dateSeen)
case x: Server
render(x)
renderHeaders(tail, alwaysClose, connHeader, serverSeen = true, transferEncodingSeen, dateSeen)
case x: CustomHeader
if (!x.suppressRendering) render(x)
renderHeaders(tail, alwaysClose, connHeader, serverSeen, transferEncodingSeen, dateSeen)
case x: RawHeader if (x is "content-type") || (x is "content-length") || (x is "transfer-encoding") ||
(x is "date") || (x is "server") || (x is "connection")
suppressionWarning(log, x, "illegal RawHeader")
renderHeaders(tail, alwaysClose, connHeader, serverSeen, transferEncodingSeen, dateSeen)
case x
render(x)
renderHeaders(tail, alwaysClose, connHeader, serverSeen, transferEncodingSeen, dateSeen) renderHeaders(tail, alwaysClose, connHeader, serverSeen, transferEncodingSeen, dateSeen)
case Some(te)
// if the user applied some custom transfer-encoding we need to keep the header
render(if (mustRenderTransferEncodingChunkedHeader) te.withChunked else te)
renderHeaders(tail, alwaysClose, connHeader, serverSeen, transferEncodingSeen = true, dateSeen)
} }
case x: Connection case Nil
val connectionHeader = if (connHeader eq null) x else Connection(x.tokens ++ connHeader.tokens) if (!serverSeen) renderDefaultServerHeader(r)
renderHeaders(tail, alwaysClose, connectionHeader, serverSeen, transferEncodingSeen, dateSeen) if (!dateSeen) r ~~ dateHeader
case x: Server // Do we close the connection after this response?
render(x) closeIf {
renderHeaders(tail, alwaysClose, connHeader, serverSeen = true, transferEncodingSeen, dateSeen) // if we are prohibited to keep-alive by the spec
alwaysClose ||
// if the client wants to close and we don't override
(ctx.closeRequested && ((connHeader eq null) || !connHeader.hasKeepAlive)) ||
// if the application wants to close explicitly
(protocol match {
case `HTTP/1.1` (connHeader ne null) && connHeader.hasClose
case `HTTP/1.0` if (connHeader eq null) ctx.requestProtocol == `HTTP/1.1` else !connHeader.hasKeepAlive
})
}
case x: CustomHeader // Do we render an explicit Connection header?
if (!x.suppressRendering) render(x) val renderConnectionHeader =
renderHeaders(tail, alwaysClose, connHeader, serverSeen, transferEncodingSeen, dateSeen) protocol == `HTTP/1.0` && !close || protocol == `HTTP/1.1` && close || // if we don't follow the default behavior
close != ctx.closeRequested || // if we override the client's closing request
protocol != ctx.requestProtocol // if we reply with a mismatching protocol (let's be very explicit in this case)
case x: RawHeader if (x is "content-type") || (x is "content-length") || (x is "transfer-encoding") || if (renderConnectionHeader)
(x is "date") || (x is "server") || (x is "connection") r ~~ Connection ~~ (if (close) CloseBytes else KeepAliveBytes) ~~ CrLf
suppressionWarning(log, x, "illegal RawHeader") else if (connHeader != null && connHeader.hasUpgrade) {
renderHeaders(tail, alwaysClose, connHeader, serverSeen, transferEncodingSeen, dateSeen) r ~~ connHeader ~~ CrLf
headers
case x .collectFirst { case u: UpgradeToWebsocketResponseHeader u }
render(x) .foreach { header closeMode = SwitchToWebsocket(header.handler) }
renderHeaders(tail, alwaysClose, connHeader, serverSeen, transferEncodingSeen, dateSeen) }
} if (mustRenderTransferEncodingChunkedHeader && !transferEncodingSeen)
r ~~ `Transfer-Encoding` ~~ ChunkedBytes ~~ CrLf
case Nil
if (!serverSeen) renderDefaultServerHeader(r)
if (!dateSeen) r ~~ dateHeader
// Do we close the connection after this response?
closeIf {
// if we are prohibited to keep-alive by the spec
alwaysClose ||
// if the client wants to close and we don't override
(ctx.closeRequested && ((connHeader eq null) || !connHeader.hasKeepAlive)) ||
// if the application wants to close explicitly
(protocol match {
case `HTTP/1.1` (connHeader ne null) && connHeader.hasClose
case `HTTP/1.0` if (connHeader eq null) ctx.requestProtocol == `HTTP/1.1` else !connHeader.hasKeepAlive
})
} }
// Do we render an explicit Connection header? def renderContentLengthHeader(contentLength: Long) =
val renderConnectionHeader = if (status.allowsEntity) r ~~ `Content-Length` ~~ contentLength ~~ CrLf else r
protocol == `HTTP/1.0` && !close || protocol == `HTTP/1.1` && close || // if we don't follow the default behavior
close != ctx.closeRequested || // if we override the client's closing request
protocol != ctx.requestProtocol // if we reply with a mismatching protocol (let's be very explicit in this case)
if (renderConnectionHeader) def byteStrings(entityBytes: Source[ByteString, Any]): Source[ResponseRenderingOutput, Any] =
r ~~ Connection ~~ (if (close) CloseBytes else KeepAliveBytes) ~~ CrLf renderByteStrings(r, entityBytes, skipEntity = noEntity).map(ResponseRenderingOutput.HttpData(_))
else if (connHeader != null && connHeader.hasUpgrade) {
r ~~ connHeader ~~ CrLf def completeResponseRendering(entity: ResponseEntity): StrictOrStreamed =
headers entity match {
.collectFirst { case u: UpgradeToWebsocketResponseHeader u } case HttpEntity.Strict(_, data)
.foreach { header closeMode = SwitchToWebsocket(header.handler) } renderHeaders(headers.toList)
renderEntityContentType(r, entity)
renderContentLengthHeader(data.length) ~~ CrLf
if (!noEntity) r ~~ data
Strict {
closeMode match {
case SwitchToWebsocket(handler) ResponseRenderingOutput.SwitchToWebsocket(r.get, handler)
case _ ResponseRenderingOutput.HttpData(r.get)
}
}
case HttpEntity.Default(_, contentLength, data)
renderHeaders(headers.toList)
renderEntityContentType(r, entity)
renderContentLengthHeader(contentLength) ~~ CrLf
Streamed(byteStrings(data.via(CheckContentLengthTransformer.flow(contentLength))))
case HttpEntity.CloseDelimited(_, data)
renderHeaders(headers.toList, alwaysClose = ctx.requestMethod != HttpMethods.HEAD)
renderEntityContentType(r, entity) ~~ CrLf
Streamed(byteStrings(data))
case HttpEntity.Chunked(contentType, chunks)
if (ctx.requestProtocol == `HTTP/1.0`)
completeResponseRendering(HttpEntity.CloseDelimited(contentType, chunks.map(_.data)))
else {
renderHeaders(headers.toList)
renderEntityContentType(r, entity) ~~ CrLf
Streamed(byteStrings(chunks.via(ChunkTransformer.flow)))
}
} }
if (mustRenderTransferEncodingChunkedHeader && !transferEncodingSeen)
r ~~ `Transfer-Encoding` ~~ ChunkedBytes ~~ CrLf renderStatusLine()
completeResponseRendering(entity)
} }
}
def renderContentLengthHeader(contentLength: Long) = sealed trait StrictOrStreamed
if (status.allowsEntity) r ~~ `Content-Length` ~~ contentLength ~~ CrLf else r case class Strict(bytes: ResponseRenderingOutput) extends StrictOrStreamed
case class Streamed(source: Source[ResponseRenderingOutput, Any]) extends StrictOrStreamed
def byteStrings(entityBytes: Source[ByteString, Any]): Source[ResponseRenderingOutput, Any] =
renderByteStrings(r, entityBytes, skipEntity = noEntity).map(ResponseRenderingOutput.HttpData(_))
def completeResponseRendering(entity: ResponseEntity): Source[ResponseRenderingOutput, Any] =
entity match {
case HttpEntity.Strict(_, data)
renderHeaders(headers.toList)
renderEntityContentType(r, entity)
renderContentLengthHeader(data.length) ~~ CrLf
if (!noEntity) r ~~ data
Source.single {
closeMode match {
case SwitchToWebsocket(handler) ResponseRenderingOutput.SwitchToWebsocket(r.get, handler)
case _ ResponseRenderingOutput.HttpData(r.get)
}
}
case HttpEntity.Default(_, contentLength, data)
renderHeaders(headers.toList)
renderEntityContentType(r, entity)
renderContentLengthHeader(contentLength) ~~ CrLf
byteStrings(data.via(CheckContentLengthTransformer.flow(contentLength)))
case HttpEntity.CloseDelimited(_, data)
renderHeaders(headers.toList, alwaysClose = ctx.requestMethod != HttpMethods.HEAD)
renderEntityContentType(r, entity) ~~ CrLf
byteStrings(data)
case HttpEntity.Chunked(contentType, chunks)
if (ctx.requestProtocol == `HTTP/1.0`)
completeResponseRendering(HttpEntity.CloseDelimited(contentType, chunks.map(_.data)))
else {
renderHeaders(headers.toList)
renderEntityContentType(r, entity) ~~ CrLf
byteStrings(chunks.via(ChunkTransformer.flow))
}
}
renderStatusLine()
val result = completeResponseRendering(entity)
if (close)
opCtx.pushAndFinish(result)
else
opCtx.push(result)
}
} }
sealed trait CloseMode sealed trait CloseMode

View file

@ -166,8 +166,7 @@ private[http] object HttpServerBluePrint {
} }
Flow[ResponseRenderingContext] Flow[ResponseRenderingContext]
.via(Flow[ResponseRenderingContext].transform(() responseRendererFactory.newRenderer).named("renderer")) .via(responseRendererFactory.renderer.named("renderer"))
.flatMapConcat(ConstantFun.scalaIdentityFunction)
.via(Flow[ResponseRenderingOutput].transform(() errorHandling(errorHandler)).named("errorLogger")) .via(Flow[ResponseRenderingOutput].transform(() errorHandling(errorHandler)).named("errorLogger"))
} }

View file

@ -19,6 +19,8 @@ import akka.stream.scaladsl._
import akka.stream.ActorMaterializer import akka.stream.ActorMaterializer
import HttpEntity._ import HttpEntity._
import scala.util.control.NonFatal
class ResponseRendererSpec extends FreeSpec with Matchers with BeforeAndAfterAll { class ResponseRendererSpec extends FreeSpec with Matchers with BeforeAndAfterAll {
val testConf: Config = ConfigFactory.parseString(""" val testConf: Config = ConfigFactory.parseString("""
akka.event-handlers = ["akka.testkit.TestEventListener"] akka.event-handlers = ["akka.testkit.TestEventListener"]
@ -583,17 +585,26 @@ class ResponseRendererSpec extends FreeSpec with Matchers with BeforeAndAfterAll
def renderTo(expected: String, close: Boolean): Matcher[ResponseRenderingContext] = def renderTo(expected: String, close: Boolean): Matcher[ResponseRenderingContext] =
equal(expected.stripMarginWithNewline("\r\n") -> close).matcher[(String, Boolean)] compose { ctx equal(expected.stripMarginWithNewline("\r\n") -> close).matcher[(String, Boolean)] compose { ctx
val renderer = newRenderer val (wasCompletedFuture, resultFuture) =
val rendererOutputSource = Await.result(Source.single(ctx) (Source.single(ctx) ++ Source.maybe[ResponseRenderingContext]) // never send upstream completion
.transform(() renderer).named("renderer") .via(renderer.named("renderer"))
.runWith(Sink.head), 1.second) .map {
val future =
rendererOutputSource.grouped(1000).map(
_.map {
case ResponseRenderingOutput.HttpData(bytes) bytes case ResponseRenderingOutput.HttpData(bytes) bytes
case _: ResponseRenderingOutput.SwitchToWebsocket throw new IllegalStateException("Didn't expect websocket response") case _: ResponseRenderingOutput.SwitchToWebsocket throw new IllegalStateException("Didn't expect websocket response")
}).runWith(Sink.head).map(_.reduceLeft(_ ++ _).utf8String) }
Await.result(future, 250.millis) -> renderer.isComplete .groupedWithin(1000, 100.millis)
.viaMat(StreamUtils.identityFinishReporter[Seq[ByteString]])(Keep.right)
.toMat(Sink.head)(Keep.both).run()
// we try to find out if the renderer has already flagged completion even without the upstream being completed
val wasCompleted =
try {
Await.ready(wasCompletedFuture, 100.millis)
true
} catch {
case NonFatal(_) false
}
Await.result(resultFuture, 250.millis).reduceLeft(_ ++ _).utf8String -> wasCompleted
} }
override def currentTimeMillis() = DateTime(2011, 8, 25, 9, 10, 29).clicks // provide a stable date for testing override def currentTimeMillis() = DateTime(2011, 8, 25, 9, 10, 29).clicks // provide a stable date for testing

View file

@ -315,7 +315,7 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount:
/** /**
* INTERNAL API * INTERNAL API
*/ */
private[stream] def interpreter: GraphInterpreter = private[akka] def interpreter: GraphInterpreter =
if (_interpreter == null) if (_interpreter == null)
throw new IllegalStateException("not yet initialized: only setHandler is allowed in GraphStageLogic constructor") throw new IllegalStateException("not yet initialized: only setHandler is allowed in GraphStageLogic constructor")
else _interpreter else _interpreter