!htp add some FIXMEs and small fixes

This commit is contained in:
Roland Kuhn 2014-12-20 12:58:32 +01:00
parent 2d140eee30
commit 3eb8580eda
5 changed files with 40 additions and 25 deletions

View file

@ -50,7 +50,7 @@ class HttpExt(config: Config)(implicit system: ActorSystem) extends akka.actor.E
*/
def serverFlowToTransport(serverFlow: Flow[HttpRequest, HttpResponse],
settings: Option[ServerSettings] = None,
log: LoggingAdapter = system.log): Flow[ByteString, ByteString] = {
log: LoggingAdapter = system.log)(implicit mat: FlowMaterializer): Flow[ByteString, ByteString] = {
val effectiveSettings = ServerSettings(settings)
HttpServer.serverFlowToTransport(serverFlow, effectiveSettings, log)
}

View file

@ -7,6 +7,7 @@ import akka.stream.actor.{ ActorPublisherMessage, ActorPublisher }
* An actor publisher for producing a simple stream of singleton tokens
* the release of which is triggered by the reception of a [[TokenSourceActor.Trigger]] message.
*/
// FIXME #16520 move this into streams
private[engine] class TokenSourceActor[T](token: T) extends ActorPublisher[T] {
private var triggered = 0

View file

@ -156,49 +156,54 @@ private[http] object HttpClient {
Vector(dataInput, methodBypassInput)
}
private val stay = (ctx: MergeLogicContext) SameState
private val gotoResponseReading = (ctx: MergeLogicContext) {
ctx.changeCompletionHandling(responseReadingCompletionHandling)
responseReadingState
}
private val gotoInitial = (ctx: MergeLogicContext) {
if (methodBypassCompleted) {
ctx.complete()
SameState
} else {
ctx.changeCompletionHandling(initialCompletionHandling)
initialState
}
}
override val initialState: State[HttpMethod] =
State(Read(methodBypassInput)) {
case (ctx, _, method)
parser.setRequestMethodForNextResponse(method)
drainParser(parser.onPush(ByteString.empty), ctx,
onNeedNextMethod = () SameState,
onNeedMoreData = () {
ctx.changeCompletionHandling(responseReadingCompletionHandling)
responseReadingState
})
onNeedNextMethod = stay,
onNeedMoreData = gotoResponseReading)
}
val responseReadingState: State[ByteString] =
State(Read(dataInput)) {
case (ctx, _, bytes)
drainParser(parser.onPush(bytes), ctx,
onNeedNextMethod = () {
if (methodBypassCompleted) {
ctx.complete()
SameState
} else {
ctx.changeCompletionHandling(initialCompletionHandling)
initialState
}
},
onNeedMoreData = () SameState)
onNeedNextMethod = gotoInitial,
onNeedMoreData = stay)
}
@tailrec def drainParser(current: ResponseOutput, ctx: MergeLogicContext,
onNeedNextMethod: () State[_], onNeedMoreData: () State[_],
onNeedNextMethod: MergeLogicContext State[_],
onNeedMoreData: MergeLogicContext State[_],
b: ListBuffer[ResponseOutput] = ListBuffer.empty): State[_] = {
def emit(output: List[ResponseOutput]): Unit = if (output.nonEmpty) ctx.emit(output)
current match {
case NeedNextRequestMethod
emit(b.result())
onNeedNextMethod()
onNeedNextMethod(ctx)
case StreamEnd
emit(b.result())
ctx.complete()
SameState
case NeedMoreData
emit(b.result())
onNeedMoreData()
onNeedMoreData(ctx)
case x drainParser(parser.onPull(), ctx, onNeedNextMethod, onNeedMoreData, b += x)
}
}

View file

@ -54,7 +54,7 @@ private[http] final class BodyPartParser(defaultContentType: ContentType,
if (illegalHeaderWarnings) log.warning(errorInfo.withSummaryPrepended("Illegal multipart header").formatPretty)
}
private[this] var output = collection.immutable.Queue.empty[Output]
private[this] var output = collection.immutable.Queue.empty[Output] // FIXME this probably is too wasteful
private[this] var state: ByteString StateResult = tryParseInitialBoundary
private[this] var receivedInitialBoundary = false
private[this] var terminated = false

View file

@ -18,6 +18,8 @@ import akka.http.engine.parsing.ParserOutput._
import akka.http.engine.TokenSourceActor
import akka.http.model._
import akka.http.util._
import akka.stream.FlowMaterializer
import akka.stream.OverflowStrategy
/**
* INTERNAL API
@ -26,7 +28,7 @@ private[http] object HttpServer {
def serverFlowToTransport(serverFlow: Flow[HttpRequest, HttpResponse],
settings: ServerSettings,
log: LoggingAdapter): Flow[ByteString, ByteString] = {
log: LoggingAdapter)(implicit mat: FlowMaterializer): Flow[ByteString, ByteString] = {
// the initial header parser we initially use for every connection,
// will not be mutated, all "shared copy" parsers copy on first-write into the header cache
@ -60,13 +62,17 @@ private[http] object HttpServer {
Flow[RequestOutput]
.splitWhen(x x.isInstanceOf[MessageStart] || x == MessageEnd)
.headAndTail
.collect {
.map {
case (RequestStart(method, uri, protocol, headers, createEntity, _, _), entityParts)
val effectiveUri = HttpRequest.effectiveUri(uri, headers, securedConnection = false, settings.defaultHostHeader)
val effectiveMethod = if (method == HttpMethods.HEAD && settings.transparentHeadRequests) HttpMethods.GET else method
HttpRequest(effectiveMethod, effectiveUri, headers, createEntity(entityParts), protocol)
}
.take(Int.MaxValue) // FIXME: removing this makes the akka.http.engine.server.HttpServerSpec fail!
case (_, src) src.runWith(BlackholeSink)
}.collect {
case r: HttpRequest r
}.buffer(1, OverflowStrategy.backpressure)
// FIXME #16583 it is unclear why this is needed, some element probably does not propagate demand eagerly enough
// the failing test would be HttpServerSpec
// we need to make sure that only one element per incoming request is queueing up in front of
// the bypassMerge.bypassInput. Otherwise the rising backpressure against the bypassFanout
@ -132,7 +138,10 @@ private[http] object HttpServer {
// see the comment on [[OneHundredContinue]] for an explanation of the closing logic here (and more)
val close = requestStart.closeAfterResponseCompletion || requestStart.expect100ContinueResponsePending
ctx.emit(ResponseRenderingContext(response, requestStart.method, requestStart.protocol, close))
if (close) finish(ctx) else initialState
if (close) finish(ctx) else {
ctx.changeCompletionHandling(eagerClose)
initialState
}
case (ctx, _, OneHundredContinue)
assert(requestStart.expect100ContinueResponsePending)