Merge pull request #16580 from spray/wip-16574-mathias

=htc #16574 fix large requests not being consumable on the server-side
This commit is contained in:
Roland Kuhn 2014-12-20 12:56:50 +01:00
commit 2d140eee30
35 changed files with 945 additions and 310 deletions

View file

@ -91,7 +91,7 @@ object Http extends ExtensionId[HttpExt] with ExtensionIdProvider {
/**
* Represents a prospective HTTP server binding.
*/
trait ServerBinding {
sealed trait ServerBinding {
/**
* The local address of the endpoint bound by the materialization of the `connections` [[Source]]
* whose [[MaterializedMap]] is passed as parameter.
@ -205,10 +205,6 @@ object Http extends ExtensionId[HttpExt] with ExtensionIdProvider {
def flow: Flow[HttpRequest, HttpResponse]
}
class RequestTimeoutException(val request: HttpRequest, message: String) extends RuntimeException(message)
class StreamException(val info: ErrorInfo) extends RuntimeException(info.summary)
//////////////////// EXTENSION SETUP ///////////////////
def apply()(implicit system: ActorSystem): HttpExt = super.apply(system)

View file

@ -0,0 +1,35 @@
package akka.http.engine
import scala.annotation.tailrec
import akka.stream.actor.{ ActorPublisherMessage, ActorPublisher }
/**
* An actor publisher for producing a simple stream of singleton tokens
* the release of which is triggered by the reception of a [[TokenSourceActor.Trigger]] message.
*/
private[engine] class TokenSourceActor[T](token: T) extends ActorPublisher[T] {
private var triggered = 0
def receive = {
case TokenSourceActor.Trigger
triggered += 1
tryDispatch()
case ActorPublisherMessage.Request(_)
tryDispatch()
case ActorPublisherMessage.Cancel
context.stop(self)
}
@tailrec private def tryDispatch(): Unit =
if (triggered > 0 && totalDemand > 0) {
onNext(token)
triggered -= 1
tryDispatch()
}
}
private[engine] object TokenSourceActor {
case object Trigger
}

View file

@ -5,16 +5,17 @@
package akka.http.engine.client
import java.net.InetSocketAddress
import scala.collection.immutable.Queue
import scala.annotation.tailrec
import scala.collection.mutable.ListBuffer
import akka.stream.stage._
import akka.util.ByteString
import akka.event.LoggingAdapter
import akka.stream.FlattenStrategy
import akka.stream.scaladsl._
import akka.stream.scaladsl.OperationAttributes._
import akka.http.model.{ HttpMethod, HttpRequest, HttpResponse }
import akka.http.model.{ IllegalResponseException, HttpMethod, HttpRequest, HttpResponse }
import akka.http.engine.rendering.{ RequestRenderingContext, HttpRequestRendererFactory }
import akka.http.engine.parsing.{ HttpHeaderParser, HttpResponseParser }
import akka.http.engine.parsing.ParserOutput._
import akka.http.engine.parsing.{ ParserOutput, HttpHeaderParser, HttpResponseParser }
import akka.http.util._
/**
@ -37,39 +38,212 @@ private[http] object HttpClient {
})
val requestRendererFactory = new HttpRequestRendererFactory(userAgentHeader, requestHeaderSizeHint, log)
val requestMethodByPass = new RequestMethodByPass(remoteAddress)
Flow[HttpRequest]
.map(requestMethodByPass)
/*
Basic Stream Setup
==================
requestIn +----------+
+-----------------------------------------------+--->| Termi- | requestRendering
| | nation +---------------------> |
+-------------------------------------->| Merge | |
| Termination Backchannel | +----------+ | TCP-
| | | level
| | Method | client
| +------------+ | Bypass | flow
responseOut | responsePrep | Response |<---+ |
<------------+----------------| Parsing | |
| Merge |<------------------------------------------ V
+------------+
*/
val requestIn = UndefinedSource[HttpRequest]
val responseOut = UndefinedSink[HttpResponse]
val methodBypassFanout = Broadcast[HttpRequest]
val responseParsingMerge = new ResponseParsingMerge(rootParser)
val terminationFanout = Broadcast[HttpResponse]
val terminationMerge = new TerminationMerge
val requestRendering = Flow[HttpRequest]
.map(RequestRenderingContext(_, remoteAddress))
.section(name("renderer"))(_.transform(() requestRendererFactory.newRenderer))
.flatten(FlattenStrategy.concat)
val transportFlow = Flow[ByteString]
.section(name("errorLogger"))(_.transform(() errorLogger(log, "Outgoing request stream error")))
.via(transport)
.section(name("rootParser"))(_.transform(()
// each connection uses a single (private) response parser instance for all its responses
// which builds a cache of all header instances seen on that connection
rootParser.createShallowCopy(requestMethodByPass)))
.splitWhen(_.isInstanceOf[MessageStart])
val methodBypass = Flow[HttpRequest].map(_.method)
import ParserOutput._
val responsePrep = Flow[List[ResponseOutput]]
.transform(recover { case x: ResponseParsingError x.error :: Nil }) // FIXME after #16565
.mapConcat(identityFunc)
.splitWhen(x x.isInstanceOf[MessageStart] || x == MessageEnd)
.headAndTail
.collect {
case (ResponseStart(statusCode, protocol, headers, createEntity, _), entityParts)
HttpResponse(statusCode, headers, createEntity(entityParts), protocol)
case (MessageStartError(_, info), _) throw IllegalResponseException(info)
}
import FlowGraphImplicits._
Flow() { implicit b
requestIn ~> methodBypassFanout ~> terminationMerge.requestInput ~> requestRendering ~> transportFlow ~>
responseParsingMerge.dataInput ~> responsePrep ~> terminationFanout ~> responseOut
methodBypassFanout ~> methodBypass ~> responseParsingMerge.methodBypassInput
terminationFanout ~> terminationMerge.terminationBackchannelInput
b.allowCycles()
requestIn -> responseOut
}
}
// FIXME: refactor to a pure-stream design that allows us to get rid of this ad-hoc queue here
class RequestMethodByPass(serverAddress: InetSocketAddress)
extends (HttpRequest RequestRenderingContext) with (() HttpMethod) {
private[this] var requestMethods = Queue.empty[HttpMethod]
def apply(request: HttpRequest) = {
requestMethods = requestMethods.enqueue(request.method)
RequestRenderingContext(request, serverAddress)
// a simple merge stage that simply forwards its first input and ignores its second input
// (the terminationBackchannelInput), but applies a special completion handling
class TerminationMerge extends FlexiMerge[HttpRequest] {
import FlexiMerge._
val requestInput = createInputPort[HttpRequest]()
val terminationBackchannelInput = createInputPort[HttpResponse]()
def createMergeLogic() = new MergeLogic[HttpRequest] {
override def inputHandles(inputCount: Int) = {
require(inputCount == 2, s"TerminationMerge must have 2 connected inputs, was $inputCount")
Vector(requestInput, terminationBackchannelInput)
}
override def initialState = State[Any](ReadAny(requestInput, terminationBackchannelInput)) {
case (ctx, _, request: HttpRequest) { ctx.emit(request); SameState }
case _ SameState // simply drop all responses, we are only interested in the completion of the response input
}
override def initialCompletionHandling = CompletionHandling(
onComplete = {
case (ctx, `requestInput`) SameState
case (ctx, `terminationBackchannelInput`)
ctx.complete()
SameState
},
onError = defaultCompletionHandling.onError)
}
def apply(): HttpMethod =
if (requestMethods.nonEmpty) {
val method = requestMethods.head
requestMethods = requestMethods.tail
method
} else HttpResponseParser.NoMethod
}
import ParserOutput._
/**
* A FlexiMerge that follows this logic:
* 1. Wait on the methodBypass for the method of the request corresponding to the next response to be received
* 2. Read from the dataInput until exactly one response has been fully received
* 3. Go back to 1.
*/
class ResponseParsingMerge(rootParser: HttpResponseParser) extends FlexiMerge[List[ResponseOutput]] {
import FlexiMerge._
val dataInput = createInputPort[ByteString]()
val methodBypassInput = createInputPort[HttpMethod]()
def createMergeLogic() = new MergeLogic[List[ResponseOutput]] {
// each connection uses a single (private) response parser instance for all its responses
// which builds a cache of all header instances seen on that connection
val parser = rootParser.createShallowCopy()
var methodBypassCompleted = false
override def inputHandles(inputCount: Int) = {
require(inputCount == 2, s"ResponseParsingMerge must have 2 connected inputs, was $inputCount")
Vector(dataInput, methodBypassInput)
}
override val initialState: State[HttpMethod] =
State(Read(methodBypassInput)) {
case (ctx, _, method)
parser.setRequestMethodForNextResponse(method)
drainParser(parser.onPush(ByteString.empty), ctx,
onNeedNextMethod = () SameState,
onNeedMoreData = () {
ctx.changeCompletionHandling(responseReadingCompletionHandling)
responseReadingState
})
}
val responseReadingState: State[ByteString] =
State(Read(dataInput)) {
case (ctx, _, bytes)
drainParser(parser.onPush(bytes), ctx,
onNeedNextMethod = () {
if (methodBypassCompleted) {
ctx.complete()
SameState
} else {
ctx.changeCompletionHandling(initialCompletionHandling)
initialState
}
},
onNeedMoreData = () SameState)
}
@tailrec def drainParser(current: ResponseOutput, ctx: MergeLogicContext,
onNeedNextMethod: () State[_], onNeedMoreData: () State[_],
b: ListBuffer[ResponseOutput] = ListBuffer.empty): State[_] = {
def emit(output: List[ResponseOutput]): Unit = if (output.nonEmpty) ctx.emit(output)
current match {
case NeedNextRequestMethod
emit(b.result())
onNeedNextMethod()
case StreamEnd
emit(b.result())
ctx.complete()
SameState
case NeedMoreData
emit(b.result())
onNeedMoreData()
case x drainParser(parser.onPull(), ctx, onNeedNextMethod, onNeedMoreData, b += x)
}
}
override val initialCompletionHandling = CompletionHandling(
onComplete = (ctx, _) { ctx.complete(); SameState },
onError = defaultCompletionHandling.onError)
val responseReadingCompletionHandling = CompletionHandling(
onComplete = {
case (ctx, `methodBypassInput`)
methodBypassCompleted = true
SameState
case (ctx, `dataInput`)
if (parser.onUpstreamFinish()) {
ctx.complete()
} else {
// not pretty but because the FlexiMerge doesn't let us emit from here (#16565)
// we need to funnel the error through the error channel
ctx.error(new ResponseParsingError(parser.onPull().asInstanceOf[ErrorOutput]))
}
SameState
},
onError = defaultCompletionHandling.onError)
}
}
private class ResponseParsingError(val error: ErrorOutput) extends RuntimeException
// TODO: remove after #16394 is cleared
def recover[A, B >: A](pf: PartialFunction[Throwable, B]): () PushPullStage[A, B] = {
val stage = new PushPullStage[A, B] {
var recovery: Option[B] = None
def onPush(elem: A, ctx: Context[B]): Directive = ctx.push(elem)
def onPull(ctx: Context[B]): Directive = recovery match {
case None ctx.pull()
case Some(x) { recovery = null; ctx.push(x) }
case null ctx.finish()
}
override def onUpstreamFailure(cause: Throwable, ctx: Context[B]): TerminationDirective =
if (pf isDefinedAt cause) {
recovery = Some(pf(cause))
ctx.absorbTermination()
} else super.onUpstreamFailure(cause, ctx)
}
() stage
}
}

View file

@ -5,7 +5,6 @@
package akka.http.engine.parsing
import scala.annotation.tailrec
import scala.collection.mutable.ListBuffer
import akka.event.LoggingAdapter
import akka.parboiled2.CharPredicate
import akka.stream.scaladsl.Source
@ -55,8 +54,7 @@ private[http] final class BodyPartParser(defaultContentType: ContentType,
if (illegalHeaderWarnings) log.warning(errorInfo.withSummaryPrepended("Illegal multipart header").formatPretty)
}
private[this] val result = new ListBuffer[Output] // transformer op is currently optimized for LinearSeqs
private[this] var resultIterator: Iterator[Output] = Iterator.empty
private[this] var output = collection.immutable.Queue.empty[Output]
private[this] var state: ByteString StateResult = tryParseInitialBoundary
private[this] var receivedInitialBoundary = false
private[this] var terminated = false
@ -65,7 +63,6 @@ private[http] final class BodyPartParser(defaultContentType: ContentType,
if (illegalHeaderWarnings) log.warning(errorInfo.withSummaryPrepended("Illegal multipart header").formatPretty)
override def onPush(input: ByteString, ctx: Context[Output]): Directive = {
result.clear()
try state(input)
catch {
case e: ParsingException fail(e.info)
@ -73,15 +70,14 @@ private[http] final class BodyPartParser(defaultContentType: ContentType,
// we are missing a try/catch{continue} wrapper somewhere
throw new IllegalStateException("unexpected NotEnoughDataException", NotEnoughDataException)
}
resultIterator = result.iterator
if (resultIterator.hasNext) ctx.push(resultIterator.next())
if (output.nonEmpty) ctx.push(dequeue())
else if (!terminated) ctx.pull()
else ctx.finish()
}
override def onPull(ctx: Context[Output]): Directive = {
if (resultIterator.hasNext)
ctx.push(resultIterator.next())
if (output.nonEmpty)
ctx.push(dequeue())
else if (ctx.isFinishing) {
if (terminated || !receivedInitialBoundary)
ctx.finish()
@ -203,7 +199,13 @@ private[http] final class BodyPartParser(defaultContentType: ContentType,
def emit(bytes: ByteString): Unit = if (bytes.nonEmpty) emit(EntityPart(bytes))
def emit(output: Output): Unit = result += output
def emit(element: Output): Unit = output = output.enqueue(element)
def dequeue(): Output = {
val head = output.head
output = output.tail
head
}
def continue(input: ByteString, offset: Int)(next: (ByteString, Int) StateResult): StateResult = {
state =

View file

@ -402,7 +402,7 @@ private[http] object HttpHeaderParser {
"Cache-Control: no-cache",
"Expect: 100-continue")
def apply(settings: HttpHeaderParser.Settings)(warnOnIllegalHeader: ErrorInfo Unit = info throw new IllegalHeaderException(info)) =
def apply(settings: HttpHeaderParser.Settings)(warnOnIllegalHeader: ErrorInfo Unit = info throw IllegalHeaderException(info)) =
prime(unprimed(settings, warnOnIllegalHeader))
def unprimed(settings: HttpHeaderParser.Settings, warnOnIllegalHeader: ErrorInfo Unit) =

View file

@ -10,7 +10,6 @@ import akka.parboiled2.CharUtils
import akka.util.ByteString
import akka.stream.scaladsl.Source
import akka.stream.stage._
import akka.http.Http.StreamException
import akka.http.model.parser.CharacterClasses
import akka.http.util._
import akka.http.model._
@ -22,21 +21,33 @@ import ParserOutput._
* INTERNAL API
*/
private[http] abstract class HttpMessageParser[Output >: MessageOutput <: ParserOutput](val settings: ParserSettings,
val headerParser: HttpHeaderParser)
extends PushPullStage[ByteString, Output] {
val headerParser: HttpHeaderParser) { self
import HttpMessageParser._
import settings._
sealed trait StateResult // phantom type for ensuring soundness of our parsing method setup
final case class Trampoline(f: ByteString StateResult) extends StateResult
private[this] val result = new ListBuffer[Output]
private[this] var state: ByteString StateResult = startNewMessage(_, 0)
private[this] var protocol: HttpProtocol = `HTTP/1.1`
private[this] var completionHandling: CompletionHandling = CompletionOk
private[this] var terminated = false
override def onPush(input: ByteString, ctx: Context[Output]): Directive = {
def isTerminated = terminated
val stage: PushPullStage[ByteString, Output] =
new PushPullStage[ByteString, Output] {
def onPush(elem: ByteString, ctx: Context[Output]) = handleParserOutput(self.onPush(elem), ctx)
def onPull(ctx: Context[Output]) = handleParserOutput(self.onPull(), ctx)
private def handleParserOutput(output: Output, ctx: Context[Output]): Directive =
output match {
case StreamEnd ctx.finish()
case NeedMoreData ctx.pull()
case x ctx.push(x)
}
override def onUpstreamFinish(ctx: Context[Output]): TerminationDirective =
if (self.onUpstreamFinish()) ctx.finish() else ctx.absorbTermination()
}
final def onPush(input: ByteString): Output = {
@tailrec def run(next: ByteString StateResult): StateResult =
(try next(input)
catch {
@ -51,37 +62,32 @@ private[http] abstract class HttpMessageParser[Output >: MessageOutput <: Parser
if (result.nonEmpty) throw new IllegalStateException("Unexpected `onPush`")
run(state)
pushResultHeadAndFinishOrPull(ctx)
onPull()
}
def onPull(ctx: Context[Output]): Directive = pushResultHeadAndFinishOrPull(ctx)
def pushResultHeadAndFinishOrPull(ctx: Context[Output]): Directive =
final def onPull(): Output =
if (result.nonEmpty) {
val head = result.head
result.remove(0) // faster than `ListBuffer::drop`
ctx.push(head)
} else if (terminated) ctx.finish() else ctx.pull()
head
} else if (terminated) StreamEnd else NeedMoreData
override def onUpstreamFinish(ctx: Context[Output]) = {
final def onUpstreamFinish(): Boolean = {
completionHandling() match {
case Some(x) emit(x.asInstanceOf[Output])
case Some(x) emit(x)
case None // nothing to do
}
terminated = true
if (result.isEmpty) ctx.finish() else ctx.absorbTermination()
result.isEmpty
}
def startNewMessage(input: ByteString, offset: Int): StateResult = {
def _startNewMessage(input: ByteString, offset: Int): StateResult =
try parseMessage(input, offset)
catch { case NotEnoughDataException continue(input, offset)(_startNewMessage) }
protected final def startNewMessage(input: ByteString, offset: Int): StateResult = {
if (offset < input.length) setCompletionHandling(CompletionIsMessageStartError)
_startNewMessage(input, offset)
try parseMessage(input, offset)
catch { case NotEnoughDataException continue(input, offset)(startNewMessage) }
}
def parseMessage(input: ByteString, offset: Int): StateResult
protected def parseMessage(input: ByteString, offset: Int): StateResult
def parseProtocol(input: ByteString, cursor: Int): Int = {
def c(ix: Int) = byteChar(input, cursor + ix)
@ -204,7 +210,7 @@ private[http] abstract class HttpMessageParser[Output >: MessageOutput <: Parser
val chunkBodyEnd = cursor + chunkSize
def result(terminatorLen: Int) = {
emit(EntityChunk(HttpEntity.Chunk(input.slice(cursor, chunkBodyEnd), extension)))
trampoline(_ parseChunk(input, chunkBodyEnd + terminatorLen, isLastMessage))
Trampoline(_ parseChunk(input, chunkBodyEnd + terminatorLen, isLastMessage))
}
byteChar(input, chunkBodyEnd) match {
case '\r' if byteChar(input, chunkBodyEnd + 1) == '\n' result(2)
@ -255,7 +261,6 @@ private[http] abstract class HttpMessageParser[Output >: MessageOutput <: Parser
state = next(_, 0)
done()
}
def trampoline(next: ByteString StateResult): StateResult = Trampoline(next)
def failMessageStart(summary: String): StateResult = failMessageStart(summary, "")
def failMessageStart(summary: String, detail: String): StateResult = failMessageStart(StatusCodes.BadRequest, summary, detail)
@ -299,7 +304,7 @@ private[http] abstract class HttpMessageParser[Output >: MessageOutput <: Parser
transformData: Source[ByteString] Source[ByteString] = identityFunc)(entityParts: Source[_ <: ParserOutput]): UniversalEntity = {
val data = entityParts.collect {
case EntityPart(bytes) bytes
case EntityStreamError(info) throw new StreamException(info)
case EntityStreamError(info) throw EntityStreamException(info)
}
HttpEntity.Default(contentType(cth), contentLength, transformData(data))
}
@ -308,7 +313,7 @@ private[http] abstract class HttpMessageParser[Output >: MessageOutput <: Parser
transformChunks: Source[HttpEntity.ChunkStreamPart] Source[HttpEntity.ChunkStreamPart] = identityFunc)(entityChunks: Source[_ <: ParserOutput]): RequestEntity = {
val chunks = entityChunks.collect {
case EntityChunk(chunk) chunk
case EntityStreamError(info) throw new StreamException(info)
case EntityStreamError(info) throw EntityStreamException(info)
}
HttpEntity.Chunked(contentType(cth), transformChunks(chunks))
}
@ -324,7 +329,10 @@ private[http] abstract class HttpMessageParser[Output >: MessageOutput <: Parser
}
private[http] object HttpMessageParser {
type CompletionHandling = () Option[ParserOutput]
sealed trait StateResult // phantom type for ensuring soundness of our parsing method setup
final case class Trampoline(f: ByteString StateResult) extends StateResult
type CompletionHandling = () Option[ErrorOutput]
val CompletionOk: CompletionHandling = () None
val CompletionIsMessageStartError: CompletionHandling =
() Some(ParserOutput.MessageStartError(StatusCodes.BadRequest, ErrorInfo("Illegal HTTP message start")))

View file

@ -11,9 +11,9 @@ import akka.stream.scaladsl.OperationAttributes._
import akka.stream.stage.{ Context, PushPullStage }
import akka.stream.scaladsl.Source
import akka.util.ByteString
import akka.http.engine.server.OneHundredContinue
import akka.http.model.parser.CharacterClasses
import akka.http.util.identityFunc
import akka.http.engine.TokenSourceActor
import akka.http.model._
import headers._
import StatusCodes._
@ -27,6 +27,7 @@ private[http] class HttpRequestParser(_settings: ParserSettings,
_headerParser: HttpHeaderParser,
oneHundredContinueRef: () Option[ActorRef] = () None)
extends HttpMessageParser[RequestOutput](_settings, _headerParser) {
import HttpMessageParser._
import settings._
private[this] var method: HttpMethod = _
@ -105,7 +106,7 @@ private[http] class HttpRequestParser(_settings: ParserSettings,
uriBytes = input.iterator.slice(uriStart, uriEnd).toArray[Byte] // TODO: can we reduce allocations here?
uri = Uri.parseHttpRequestTarget(uriBytes, mode = uriParsingMode)
} catch {
case e: IllegalUriException throw new ParsingException(BadRequest, e.info)
case IllegalUriException(info) throw new ParsingException(BadRequest, info)
}
uriEnd + 1
}
@ -133,7 +134,7 @@ private[http] class HttpRequestParser(_settings: ParserSettings,
def onPull(ctx: Context[T]) = {
if (!oneHundredContinueSent) {
val ref = oneHundredContinueRef().getOrElse(throw new IllegalStateException("oneHundredContinueRef unavailable"))
ref ! OneHundredContinue
ref ! TokenSourceActor.Trigger
oneHundredContinueSent = true
}
ctx.pull()

View file

@ -10,38 +10,41 @@ import akka.stream.scaladsl.Source
import akka.util.ByteString
import akka.http.model._
import headers._
import HttpResponseParser.NoMethod
import ParserOutput._
/**
* INTERNAL API
*/
private[http] class HttpResponseParser(_settings: ParserSettings,
_headerParser: HttpHeaderParser,
dequeueRequestMethodForNextResponse: () HttpMethod = () NoMethod)
private[http] class HttpResponseParser(_settings: ParserSettings, _headerParser: HttpHeaderParser)
extends HttpMessageParser[ResponseOutput](_settings, _headerParser) {
import HttpMessageParser._
import settings._
private[this] var requestMethodForCurrentResponse: HttpMethod = NoMethod
private[this] var requestMethodForCurrentResponse: Option[HttpMethod] = None
private[this] var statusCode: StatusCode = StatusCodes.OK
def createShallowCopy(dequeueRequestMethodForNextResponse: () HttpMethod): HttpResponseParser =
new HttpResponseParser(settings, headerParser.createShallowCopy(), dequeueRequestMethodForNextResponse)
def createShallowCopy(): HttpResponseParser = new HttpResponseParser(settings, headerParser.createShallowCopy())
override def startNewMessage(input: ByteString, offset: Int): StateResult = {
requestMethodForCurrentResponse = dequeueRequestMethodForNextResponse()
super.startNewMessage(input, offset)
}
def setRequestMethodForNextResponse(requestMethod: HttpMethod): Unit =
if (requestMethodForCurrentResponse.isEmpty) requestMethodForCurrentResponse = Some(requestMethod)
def parseMessage(input: ByteString, offset: Int): StateResult =
if (requestMethodForCurrentResponse ne NoMethod) {
protected def parseMessage(input: ByteString, offset: Int): StateResult =
if (requestMethodForCurrentResponse.isDefined) {
var cursor = parseProtocol(input, offset)
if (byteChar(input, cursor) == ' ') {
cursor = parseStatusCode(input, cursor + 1)
cursor = parseReason(input, cursor)()
parseHeaderLines(input, cursor)
} else badProtocol
} else failMessageStart("Unexpected server response", input.drop(offset).utf8String)
} else {
emit(NeedNextRequestMethod)
done()
}
override def emit(output: ResponseOutput): Unit = {
if (output == MessageEnd) requestMethodForCurrentResponse = None
super.emit(output)
}
def badProtocol = throw new ParsingException("The server-side HTTP version is not supported")
@ -81,10 +84,11 @@ private[http] class HttpResponseParser(_settings: ParserSettings,
def finishEmptyResponse() = {
emitResponseStart(emptyEntity(cth))
setCompletionHandling(HttpMessageParser.CompletionOk)
emit(MessageEnd)
startNewMessage(input, bodyStart)
}
if (statusCode.allowsEntity && (requestMethodForCurrentResponse ne HttpMethods.HEAD)) {
if (statusCode.allowsEntity && (requestMethodForCurrentResponse.get != HttpMethods.HEAD)) {
teh match {
case None clh match {
case Some(`Content-Length`(contentLength))
@ -95,6 +99,7 @@ private[http] class HttpResponseParser(_settings: ParserSettings,
val cl = contentLength.toInt
emitResponseStart(strictEntity(cth, input, bodyStart, cl))
setCompletionHandling(HttpMessageParser.CompletionOk)
emit(MessageEnd)
startNewMessage(input, bodyStart + cl)
} else {
emitResponseStart(defaultEntity(cth, contentLength))
@ -128,11 +133,4 @@ private[http] class HttpResponseParser(_settings: ParserSettings,
emit(EntityPart(input drop bodyStart))
continue(parseToCloseBody)
}
}
/**
* INTERNAL API
*/
private[http] object HttpResponseParser {
val NoMethod = HttpMethod.custom("NONE", safe = false, idempotent = false, entityAccepted = false)
}

View file

@ -21,6 +21,7 @@ private[http] object ParserOutput {
sealed trait ResponseOutput extends ParserOutput
sealed trait MessageStart extends ParserOutput
sealed trait MessageOutput extends RequestOutput with ResponseOutput
sealed trait ErrorOutput extends MessageOutput
final case class RequestStart(
method: HttpMethod,
@ -44,7 +45,15 @@ private[http] object ParserOutput {
final case class EntityChunk(chunk: HttpEntity.ChunkStreamPart) extends MessageOutput
final case class MessageStartError(status: StatusCode, info: ErrorInfo) extends MessageStart with MessageOutput
final case class MessageStartError(status: StatusCode, info: ErrorInfo) extends MessageStart with ErrorOutput
final case class EntityStreamError(info: ErrorInfo) extends MessageOutput
final case class EntityStreamError(info: ErrorInfo) extends ErrorOutput
//////////// meta messages ///////////
case object StreamEnd extends MessageOutput
case object NeedMoreData extends MessageOutput
case object NeedNextRequestMethod extends ResponseOutput
}

View file

@ -75,13 +75,13 @@ private object RenderSupport {
override def onPush(elem: ByteString, ctx: Context[ByteString]): Directive = {
sent += elem.length
if (sent > length)
throw new InvalidContentLengthException(s"HTTP message had declared Content-Length $length but entity data stream amounts to more bytes")
throw InvalidContentLengthException(s"HTTP message had declared Content-Length $length but entity data stream amounts to more bytes")
ctx.push(elem)
}
override def onUpstreamFinish(ctx: Context[ByteString]): TerminationDirective = {
if (sent < length)
throw new InvalidContentLengthException(s"HTTP message had declared Content-Length $length but entity data stream amounts to ${length - sent} bytes less")
throw InvalidContentLengthException(s"HTTP message had declared Content-Length $length but entity data stream amounts to ${length - sent} bytes less")
ctx.finish()
}

View file

@ -4,10 +4,10 @@
package akka.http.engine.server
import scala.util.control.NonFatal
import akka.actor.{ ActorRef, Props }
import akka.util.ByteString
import akka.event.LoggingAdapter
import akka.stream.stage.PushPullStage
import akka.stream.scaladsl.OperationAttributes._
import akka.stream.FlattenStrategy
import akka.stream.scaladsl._
@ -15,11 +15,9 @@ import akka.stream.stage.PushPullStage
import akka.http.engine.parsing.{ HttpHeaderParser, HttpRequestParser }
import akka.http.engine.rendering.{ ResponseRenderingContext, HttpResponseRendererFactory }
import akka.http.engine.parsing.ParserOutput._
import akka.http.engine.TokenSourceActor
import akka.http.model._
import akka.http.util._
import akka.http.Http
import scala.util.control.NonFatal
/**
* INTERNAL API
@ -44,7 +42,7 @@ private[http] object HttpServer {
@volatile var oneHundredContinueRef: Option[ActorRef] = None // FIXME: unnecessary after fixing #16168
val oneHundredContinueSource = Source[OneHundredContinue.type] {
Props {
val actor = new OneHundredContinueSourceActor
val actor = new TokenSourceActor(OneHundredContinue)
oneHundredContinueRef = Some(actor.context.self)
actor
}
@ -56,7 +54,7 @@ private[http] object HttpServer {
val requestParsing = Flow[ByteString].section(name("rootParser"))(_.transform(()
// each connection uses a single (private) request parser instance for all its requests
// which builds a cache of all header instances seen on that connection
rootParser.createShallowCopy(() oneHundredContinueRef)))
rootParser.createShallowCopy(() oneHundredContinueRef).stage))
val requestPreparation =
Flow[RequestOutput]
@ -68,6 +66,15 @@ private[http] object HttpServer {
val effectiveMethod = if (method == HttpMethods.HEAD && settings.transparentHeadRequests) HttpMethods.GET else method
HttpRequest(effectiveMethod, effectiveUri, headers, createEntity(entityParts), protocol)
}
.take(Int.MaxValue) // FIXME: removing this makes the akka.http.engine.server.HttpServerSpec fail!
// we need to make sure that only one element per incoming request is queueing up in front of
// the bypassMerge.bypassInput. Otherwise the rising backpressure against the bypassFanout
// would eventually prevent us from reading the remaining request chunks from the transportIn
val bypass = Flow[RequestOutput].filter {
case (_: RequestStart | _: MessageStartError) true
case _ false
}
val rendererPipeline =
Flow[ResponseRenderingContext]
@ -84,7 +91,7 @@ private[http] object HttpServer {
Flow() { implicit b
//FIXME: the graph is unnecessary after fixing #15957
transportIn ~> requestParsing ~> bypassFanout ~> requestPreparation ~> serverFlow ~> bypassMerge.applicationInput ~> rendererPipeline ~> transportOut
bypassFanout ~> bypassMerge.bypassInput
bypassFanout ~> bypass ~> bypassMerge.bypassInput
oneHundredContinueSource ~> bypassMerge.oneHundredContinueInput
b.allowCycles()
@ -101,18 +108,25 @@ private[http] object HttpServer {
val applicationInput = createInputPort[HttpResponse]()
def createMergeLogic() = new MergeLogic[ResponseRenderingContext] {
var requestStart: RequestStart = _
override def inputHandles(inputCount: Int) = {
require(inputCount == 3, s"BypassMerge must have 3 connected inputs, was $inputCount")
Vector(bypassInput, oneHundredContinueInput, applicationInput)
}
override val initialState = State[Any](Read(bypassInput)) {
case (ctx, _, requestStart: RequestStart) waitingForApplicationResponse(requestStart)
override val initialState: State[Any] = State[Any](Read(bypassInput)) {
case (ctx, _, requestStart: RequestStart)
this.requestStart = requestStart
ctx.changeCompletionHandling(waitingForApplicationResponseCompletionHandling)
waitingForApplicationResponse
case (ctx, _, MessageStartError(status, info)) finishWithError(ctx, "request", status, info)
case _ SameState // drop other parser output
case _ throw new IllegalStateException
}
def waitingForApplicationResponse(requestStart: RequestStart): State[Any] =
override val initialCompletionHandling = eagerClose
val waitingForApplicationResponse =
State[Any](ReadAny(oneHundredContinueInput, applicationInput)) {
case (ctx, _, response: HttpResponse)
// see the comment on [[OneHundredContinue]] for an explanation of the closing logic here (and more)
@ -123,18 +137,20 @@ private[http] object HttpServer {
case (ctx, _, OneHundredContinue)
assert(requestStart.expect100ContinueResponsePending)
ctx.emit(ResponseRenderingContext(HttpResponse(StatusCodes.Continue)))
waitingForApplicationResponse(requestStart.copy(expect100ContinueResponsePending = false))
requestStart = requestStart.copy(expect100ContinueResponsePending = false)
SameState
}
override def initialCompletionHandling = CompletionHandling(
onComplete = (ctx, _) { ctx.complete(); SameState },
val waitingForApplicationResponseCompletionHandling = CompletionHandling(
onComplete = {
case (ctx, `bypassInput`) { requestStart = requestStart.copy(closeAfterResponseCompletion = true); SameState }
case (ctx, _) { ctx.complete(); SameState }
},
onError = {
case (ctx, _, error: Http.StreamException)
case (ctx, _, EntityStreamException(errorInfo))
// the application has forwarded a request entity stream error to the response stream
finishWithError(ctx, "request", StatusCodes.BadRequest, error.info)
case (ctx, _, error)
ctx.error(error)
SameState
finishWithError(ctx, "request", StatusCodes.BadRequest, errorInfo)
case (ctx, _, error) { ctx.error(error); SameState }
})
def finishWithError(ctx: MergeLogicContext, target: String, status: StatusCode, info: ErrorInfo): State[Any] = {
@ -150,27 +166,63 @@ private[http] object HttpServer {
}
}
}
}
private[server] class ErrorsTo500ResponseRecovery(log: LoggingAdapter)
extends PushPullStage[ResponseRenderingContext, ResponseRenderingContext] {
import akka.stream.stage.Context
/**
* The `Expect: 100-continue` header has a special status in HTTP.
* It allows the client to send an `Expect: 100-continue` header with the request and then pause request sending
* (i.e. hold back sending the request entity). The server reads the request headers, determines whether it wants to
* accept the request and responds with
*
* - `417 Expectation Failed`, if it doesn't support the `100-continue` expectation
* (or if the `Expect` header contains other, unsupported expectations).
* - a `100 Continue` response,
* if it is ready to accept the request entity and the client should go ahead with sending it
* - a final response (like a 4xx to signal some client-side error
* (e.g. if the request entity length is beyond the configured limit) or a 3xx redirect)
*
* Only if the client receives a `100 Continue` response from the server is it allowed to continue sending the request
* entity. In this case it will receive another response after having completed request sending.
* So this special feature breaks the normal "one request - one response" logic of HTTP!
* It therefore requires special handling in all HTTP stacks (client- and server-side).
*
* For us this means:
*
* - on the server-side:
* After having read a `Expect: 100-continue` header with the request we package up an `HttpRequest` instance and send
* it through to the application. Only when (and if) the application then requests data from the entity stream do we
* send out a `100 Continue` response and continue reading the request entity.
* The application can therefore determine itself whether it wants the client to send the request entity
* by deciding whether to look at the request entity data stream or not.
* If the application sends a response *without* having looked at the request entity the client receives this
* response *instead of* the `100 Continue` response and the server closes the connection afterwards.
*
* - on the client-side:
* If the user adds a `Expect: 100-continue` header to the request we need to hold back sending the entity until
* we've received a `100 Continue` response.
*/
case object OneHundredContinue
private[this] var errorResponse: ResponseRenderingContext = _
final class ErrorsTo500ResponseRecovery(log: LoggingAdapter)
extends PushPullStage[ResponseRenderingContext, ResponseRenderingContext] {
override def onPush(elem: ResponseRenderingContext, ctx: Context[ResponseRenderingContext]) = ctx.push(elem)
import akka.stream.stage.Context
override def onPull(ctx: Context[ResponseRenderingContext]) =
if (ctx.isFinishing) ctx.pushAndFinish(errorResponse)
else ctx.pull()
private[this] var errorResponse: ResponseRenderingContext = _
override def onUpstreamFailure(error: Throwable, ctx: Context[ResponseRenderingContext]) =
error match {
case NonFatal(e)
log.error(e, "Internal server error, sending 500 response")
errorResponse = ResponseRenderingContext(HttpResponse(StatusCodes.InternalServerError),
closeAfterResponseCompletion = true)
ctx.absorbTermination()
case _ ctx.fail(error)
}
}
override def onPush(elem: ResponseRenderingContext, ctx: Context[ResponseRenderingContext]) = ctx.push(elem)
override def onPull(ctx: Context[ResponseRenderingContext]) =
if (ctx.isFinishing) ctx.pushAndFinish(errorResponse)
else ctx.pull()
override def onUpstreamFailure(error: Throwable, ctx: Context[ResponseRenderingContext]) =
error match {
case NonFatal(e)
log.error(e, "Internal server error, sending 500 response")
errorResponse = ResponseRenderingContext(HttpResponse(StatusCodes.InternalServerError),
closeAfterResponseCompletion = true)
ctx.absorbTermination()
case _ ctx.fail(error)
}
}
}

View file

@ -1,66 +0,0 @@
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.http.engine.server
import scala.annotation.tailrec
import akka.stream.actor.{ ActorPublisherMessage, ActorPublisher }
/**
* The `Expect: 100-continue` header has a special status in HTTP.
* It allows the client to send an `Expect: 100-continue` header with the request and then pause request sending
* (i.e. hold back sending the request entity). The server reads the request headers, determines whether it wants to
* accept the request and responds with
*
* - `417 Expectation Failed`, if it doesn't support the `100-continue` expectation
* (or if the `Expect` header contains other, unsupported expectations).
* - a `100 Continue` response,
* if it is ready to accept the request entity and the client should go ahead with sending it
* - a final response (like a 4xx to signal some client-side error
* (e.g. if the request entity length is beyond the configured limit) or a 3xx redirect)
*
* Only if the client receives a `100 Continue` response from the server is it allowed to continue sending the request
* entity. In this case it will receive another response after having completed request sending.
* So this special feature breaks the normal "one request - one response" logic of HTTP!
* It therefore requires special handling in all HTTP stacks (client- and server-side).
*
* For us this means:
*
* - on the server-side:
* After having read a `Expect: 100-continue` header with the request we package up an `HttpRequest` instance and send
* it through to the application. Only when (and if) the application then requests data from the entity stream do we
* send out a `100 Continue` response and continue reading the request entity.
* The application can therefore determine itself whether it wants the client to send the request entity
* by deciding whether to look at the request entity data stream or not.
* If the application sends a response *without* having looked at the request entity the client receives this
* response *instead of* the `100 Continue` response and the server closes the connection afterwards.
*
* - on the client-side:
* If the user adds a `Expect: 100-continue` header to the request we need to hold back sending the entity until
* we've received a `100 Continue` response.
*/
private[engine] case object OneHundredContinue
private[engine] class OneHundredContinueSourceActor extends ActorPublisher[OneHundredContinue.type] {
private var triggered = 0
def receive = {
case OneHundredContinue
triggered += 1
tryDispatch()
case ActorPublisherMessage.Request(_)
tryDispatch()
case ActorPublisherMessage.Cancel
context.stop(self)
}
@tailrec private def tryDispatch(): Unit =
if (triggered > 0 && totalDemand > 0) {
onNext(OneHundredContinue)
triggered -= 1
tryDispatch()
}
}

View file

@ -4,7 +4,7 @@
package akka.http.model
import StatusCodes.{ ClientError, ServerError }
import StatusCodes.ClientError
/**
* Two-level model of error information.
@ -33,34 +33,43 @@ object ErrorInfo {
}
/** Marker for exceptions that provide an ErrorInfo */
abstract case class ExceptionWithErrorInfo(info: ErrorInfo) extends RuntimeException(info.formatPretty)
abstract class ExceptionWithErrorInfo(info: ErrorInfo) extends RuntimeException(info.formatPretty)
class IllegalUriException(info: ErrorInfo) extends ExceptionWithErrorInfo(info) {
def this(summary: String, detail: String = "") = this(ErrorInfo(summary, detail))
case class IllegalUriException(info: ErrorInfo) extends ExceptionWithErrorInfo(info)
object IllegalUriException {
def apply(summary: String, detail: String = ""): IllegalUriException = apply(ErrorInfo(summary, detail))
}
class IllegalHeaderException(info: ErrorInfo) extends ExceptionWithErrorInfo(info) {
def this(summary: String, detail: String = "") = this(ErrorInfo(summary, detail))
case class IllegalHeaderException(info: ErrorInfo) extends ExceptionWithErrorInfo(info)
object IllegalHeaderException {
def apply(summary: String, detail: String = ""): IllegalHeaderException = apply(ErrorInfo(summary, detail))
}
class InvalidContentLengthException(info: ErrorInfo) extends ExceptionWithErrorInfo(info) {
def this(summary: String, detail: String = "") = this(ErrorInfo(summary, detail))
case class InvalidContentLengthException(info: ErrorInfo) extends ExceptionWithErrorInfo(info)
object InvalidContentLengthException {
def apply(summary: String, detail: String = ""): InvalidContentLengthException = apply(ErrorInfo(summary, detail))
}
class ParsingException(info: ErrorInfo) extends ExceptionWithErrorInfo(info) {
def this(summary: String, detail: String = "") = this(ErrorInfo(summary, detail))
case class ParsingException(info: ErrorInfo) extends ExceptionWithErrorInfo(info)
object ParsingException {
def apply(summary: String, detail: String = ""): ParsingException = apply(ErrorInfo(summary, detail))
}
class IllegalRequestException private (info: ErrorInfo, val status: ClientError)
extends ExceptionWithErrorInfo(info) {
def this(status: ClientError) = this(ErrorInfo(status.defaultMessage), status)
def this(status: ClientError, info: ErrorInfo) = this(info.withFallbackSummary(status.defaultMessage), status)
def this(status: ClientError, detail: String) = this(ErrorInfo(status.defaultMessage, detail), status)
case class IllegalRequestException(info: ErrorInfo, status: ClientError) extends ExceptionWithErrorInfo(info)
object IllegalRequestException {
def apply(status: ClientError): IllegalRequestException = apply(ErrorInfo(status.defaultMessage), status)
def apply(status: ClientError, info: ErrorInfo): IllegalRequestException = apply(info.withFallbackSummary(status.defaultMessage), status)
def apply(status: ClientError, detail: String): IllegalRequestException = apply(ErrorInfo(status.defaultMessage, detail), status)
}
class RequestProcessingException private (info: ErrorInfo, val status: ServerError)
extends ExceptionWithErrorInfo(info) {
def this(status: ServerError) = this(ErrorInfo(status.defaultMessage), status)
def this(status: ServerError, info: ErrorInfo) = this(info.withFallbackSummary(status.defaultMessage), status)
def this(status: ServerError, detail: String) = this(ErrorInfo(status.defaultMessage, detail), status)
}
case class IllegalResponseException(info: ErrorInfo) extends ExceptionWithErrorInfo(info)
object IllegalResponseException {
def apply(summary: String, detail: String = ""): IllegalResponseException = apply(ErrorInfo(summary, detail))
}
case class EntityStreamException(info: ErrorInfo) extends ExceptionWithErrorInfo(info)
object EntityStreamException {
def apply(summary: String, detail: String = ""): EntityStreamException = apply(ErrorInfo(summary, detail))
}
case class RequestTimeoutException(request: HttpRequest, message: String) extends RuntimeException(message)

View file

@ -270,7 +270,7 @@ object HttpRequest {
val hostHeader = headers.collectFirst { case x: Host x }
if (uri.isRelative) {
def fail(detail: String) =
throw new IllegalUriException(s"Cannot establish effective URI of request to `$uri`, request has a relative URI and $detail")
throw IllegalUriException(s"Cannot establish effective URI of request to `$uri`, request has a relative URI and $detail")
val Host(host, port) = hostHeader match {
case None if (defaultHostHeader.isEmpty) fail("is missing a `Host` header") else defaultHostHeader
case Some(x) if x.isEmpty if (defaultHostHeader.isEmpty) fail("an empty `Host` header") else defaultHostHeader
@ -280,7 +280,7 @@ object HttpRequest {
} else // http://tools.ietf.org/html/rfc7230#section-5.4
if (hostHeader.isEmpty || uri.authority.isEmpty && hostHeader.get.isEmpty ||
hostHeader.get.host.equalsIgnoreCase(uri.authority.host)) uri
else throw new IllegalUriException(s"'Host' header value of request to `$uri` doesn't match request target authority",
else throw IllegalUriException(s"'Host' header value of request to `$uri` doesn't match request target authority",
s"Host header: $hostHeader\nrequest target authority: ${uri.authority}")
}

View file

@ -108,13 +108,13 @@ object Multipart {
val params = dispositionParams
params.get("name") match {
case Some(name) Success(f(name, params - "name", headers.filterNot(_ is "content-disposition")))
case None Failure(new IllegalHeaderException("multipart/form-data part must contain `Content-Disposition` header with `name` parameter"))
case None Failure(IllegalHeaderException("multipart/form-data part must contain `Content-Disposition` header with `name` parameter"))
}
}
private[BodyPart] def tryCreateByteRangesBodyPart[T](f: (ContentRange, RangeUnit, immutable.Seq[HttpHeader]) T): Try[T] =
headers.collectFirst { case x: `Content-Range` x } match {
case Some(`Content-Range`(unit, range)) Success(f(range, unit, headers.filterNot(_ is "content-range")))
case None Failure(new IllegalHeaderException("multipart/byteranges part must contain `Content-Range` header"))
case None Failure(IllegalHeaderException("multipart/byteranges part must contain `Content-Range` header"))
}
}
object BodyPart {

View file

@ -723,8 +723,7 @@ object Uri {
if (hasDotOrDotDotSegment(path)) process(path) else path
}
private[http] def fail(summary: String, detail: String = "") =
throw new IllegalUriException(summary, detail)
private[http] def fail(summary: String, detail: String = "") = throw IllegalUriException(summary, detail)
private[http] def create(scheme: String, userinfo: String, host: Host, port: Int, path: Path, query: Query,
fragment: Option[String]): Uri =

View file

@ -401,7 +401,7 @@ private[parser] trait CommonRules { this: Parser with StringBuilding ⇒
private def createDateTime(year: Int, month: Int, day: Int, hour: Int, min: Int, sec: Int, wkday: Int) = {
val dt = DateTime(year, month, day, hour, min, sec)
if (dt.weekday != wkday)
throw new ParsingException(s"Illegal weekday in date $dt: is '${DateTime.weekday(wkday)}' but " +
throw ParsingException(s"Illegal weekday in date $dt: is '${DateTime.weekday(wkday)}' but " +
s"should be '${DateTime.weekday(dt.weekday)}'")
dt
}

View file

@ -37,8 +37,8 @@ private[http] class HeaderParser(val input: ParserInput) extends Parser with Dyn
def success(result: HttpHeader :: HNil): Result = Right(result.head)
def parseError(error: ParseError): Result = Left(errorInfo(error))
def failure(error: Throwable): Result = error match {
case e: IllegalUriException Left(e.info)
case NonFatal(e) Left(ErrorInfo.fromCompoundString(e.getMessage))
case IllegalUriException(info) Left(info)
case NonFatal(e) Left(ErrorInfo.fromCompoundString(e.getMessage))
}
def ruleNotFound(ruleName: String): Result = throw HeaderParser.RuleNotFoundException
}

View file

@ -56,7 +56,7 @@ private[parser] trait LinkHeader { this: Parser with CommonRules with CommonActi
capture(oneOrMore(!'"' ~ !';' ~ !',' ~ VCHAR)) ~> { s
try new UriParser(s).parseUriReference()
catch {
case e: IllegalUriException throw new ParsingException(e.info.withSummaryPrepended("Illegal `Link` header relation-type"))
case IllegalUriException(info) throw ParsingException(info.withSummaryPrepended("Illegal `Link` header relation-type"))
}
s
}

View file

@ -136,15 +136,17 @@ object DateTime {
// compute yearday from month/monthday
val m = month - 1
var d = (m % 7) * 30 + (m % 7 + 1) / 2 + day
val isLeap = ((year % 4 == 0) && !(year % 100 == 0)) || (year % 400 == 0)
val m7 = m % 7
var d = m7 * 30 + ((m7 + 1) >> 1) + day
val isLeap = isLeapYear(year)
if (m >= 7) d += 214
if (d >= 61) d -= 1 // skip non-existent Feb 30
if (!isLeap && (d >= 60)) d -= 1 // skip non-existent Feb 29
// convert year/yearday to days since Jan 1, 1970, 00:00:00
val y = year - 1
d += y * 365 + y / 4 - y / 100 + y / 400
val yd = y / 100
d += y * 365 + (y >> 2) - yd + (yd >> 2)
val dn = d - (1969 * 365 + 492 - 19 + 4)
val c = (dn - 1) * 86400L + hour * 3600L + minute * 60L + second // seconds since Jan 1, 1970, 00:00:00
@ -180,7 +182,7 @@ object DateTime {
else {
y += 100 * (d / 36524)
d %= 36524
y += 4 * (d / 1461)
y += (d / 1461) << 2
d %= 1461
if (d == 1460) { y += 3; d = 365 } // last year out of 4 is long
else {
@ -189,14 +191,14 @@ object DateTime {
}
}
val isLeap = (((y & 0x03) == 0) && !(y % 100 == 0)) || (y % 400 == 0)
val isLeap = isLeapYear(y)
// compute month/monthday from year/yearday
if (!isLeap && (d >= 59)) d += 1 // skip non-existent Feb 29
if (d >= 60) d += 1 // skip non-existent Feb 30
val d214 = d % 214
val d214_61 = d214 % 61
var mon = (d214 / 61) * 2 + d214_61 / 31
var mon = ((d214 / 61) << 1) + d214_61 / 31
if (d > 213) mon += 7
d = d214_61 % 31 + 1
@ -211,6 +213,13 @@ object DateTime {
isLeapYear = isLeap)
}
private def isLeapYear(year: Int): Boolean =
((year & 0x03) == 0) && {
val q = year / 100
val r = year % 100
r != 0 || (q & 0x03) == 0
}
/**
* Creates a new `DateTime` instance for the current point in time.
* Note that this implementation discards milliseconds (i.e. rounds down to full seconds).

View file

@ -55,31 +55,25 @@ package object util {
.flatten(FlattenStrategy.concat)
}
private[http] implicit class EnhancedSource[T](val underlying: Source[T]) {
def printEvent(marker: String): Source[T] =
underlying.transform(() new PushStage[T, T] {
override def onPush(element: T, ctx: Context[T]): Directive = {
println(s"$marker: $element")
ctx.push(element)
}
override def onUpstreamFailure(cause: Throwable, ctx: Context[T]): TerminationDirective = {
println(s"$marker: Failure $cause")
super.onUpstreamFailure(cause, ctx)
}
override def onUpstreamFinish(ctx: Context[T]): TerminationDirective = {
println(s"$marker: Terminated")
super.onUpstreamFinish(ctx)
}
})
/**
* Drain this stream into a Vector and provide it as a future value.
*
* FIXME: Should be part of akka-streams
*/
def collectAll(implicit materializer: FlowMaterializer): Future[immutable.Seq[T]] =
underlying.fold(Vector.empty[T])(_ :+ _)
}
def printEvent[T](marker: String): Flow[T, T] =
Flow[T].transform(() new PushStage[T, T] {
override def onPush(element: T, ctx: Context[T]): Directive = {
println(s"$marker: $element")
ctx.push(element)
}
override def onUpstreamFailure(cause: Throwable, ctx: Context[T]): TerminationDirective = {
println(s"$marker: Error $cause")
super.onUpstreamFailure(cause, ctx)
}
override def onUpstreamFinish(ctx: Context[T]): TerminationDirective = {
println(s"$marker: Complete")
super.onUpstreamFinish(ctx)
}
override def onDownstreamFinish(ctx: Context[T]): TerminationDirective = {
println(s"$marker: Cancel")
super.onDownstreamFinish(ctx)
}
})
private[http] implicit class AddFutureAwaitResult[T](future: Future[T]) {
/** "Safe" Await.result that doesn't throw away half of the stacktrace */

View file

@ -36,7 +36,7 @@ class ClientServerSpec extends WordSpec with Matchers with BeforeAndAfterAll {
implicit val materializer = FlowMaterializer()
"The server-side HTTP infrastructure" should {
"The low-level HTTP infrastructure" should {
"properly bind a server" in {
val (hostname, port) = temporaryServerHostnameAndPort()
@ -70,6 +70,7 @@ class ClientServerSpec extends WordSpec with Matchers with BeforeAndAfterAll {
val (serverIn, serverOut) = acceptConnection()
val clientOutSub = clientOut.expectSubscription()
clientOutSub.expectRequest()
clientOutSub.sendNext(HttpRequest(uri = "/abc"))
val serverInSub = serverIn.expectSubscription()
@ -77,12 +78,20 @@ class ClientServerSpec extends WordSpec with Matchers with BeforeAndAfterAll {
serverIn.expectNext().uri shouldEqual Uri(s"http://$hostname:$port/abc")
val serverOutSub = serverOut.expectSubscription()
serverOutSub.expectRequest()
serverOutSub.sendNext(HttpResponse(entity = "yeah"))
val clientInSub = clientIn.expectSubscription()
clientInSub.request(1)
val response = clientIn.expectNext()
toStrict(response.entity) shouldEqual HttpEntity("yeah")
clientOutSub.sendComplete()
serverInSub.request(1) // work-around for #16552
serverIn.expectComplete()
serverOutSub.expectCancellation()
clientInSub.request(1) // work-around for #16552
clientIn.expectComplete()
}
"properly complete a chunked request/response cycle" in new TestSetup {
@ -104,6 +113,7 @@ class ClientServerSpec extends WordSpec with Matchers with BeforeAndAfterAll {
Await.result(chunkStream.grouped(4).runWith(Sink.head), 100.millis) shouldEqual chunks
val serverOutSub = serverOut.expectSubscription()
serverOutSub.expectRequest()
serverOutSub.sendNext(HttpResponse(206, List(RawHeader("Age", "42")), chunkedEntity))
val clientInSub = clientIn.expectSubscription()
@ -111,8 +121,42 @@ class ClientServerSpec extends WordSpec with Matchers with BeforeAndAfterAll {
val HttpResponse(StatusCodes.PartialContent, List(RawHeader("Age", "42"), Server(_), Date(_)),
Chunked(`chunkedContentType`, chunkStream2), HttpProtocols.`HTTP/1.1`) = clientIn.expectNext()
Await.result(chunkStream2.grouped(1000).runWith(Sink.head), 100.millis) shouldEqual chunks
clientOutSub.sendComplete()
serverInSub.request(1) // work-around for #16552
serverIn.expectComplete()
serverOutSub.expectCancellation()
clientInSub.request(1) // work-around for #16552
clientIn.expectComplete()
}
"be able to deal with eager closing of the request stream on the client side" in new TestSetup {
val (clientOut, clientIn) = openNewClientConnection()
val (serverIn, serverOut) = acceptConnection()
val clientOutSub = clientOut.expectSubscription()
clientOutSub.sendNext(HttpRequest(uri = "/abc"))
clientOutSub.sendComplete() // complete early
val serverInSub = serverIn.expectSubscription()
serverInSub.request(1)
serverIn.expectNext().uri shouldEqual Uri(s"http://$hostname:$port/abc")
val serverOutSub = serverOut.expectSubscription()
serverOutSub.expectRequest()
serverOutSub.sendNext(HttpResponse(entity = "yeah"))
val clientInSub = clientIn.expectSubscription()
clientInSub.request(1)
val response = clientIn.expectNext()
toStrict(response.entity) shouldEqual HttpEntity("yeah")
serverInSub.request(1) // work-around for #16552
serverIn.expectComplete()
serverOutSub.expectCancellation()
clientInSub.request(1) // work-around for #16552
clientIn.expectComplete()
}
}
override def afterAll() = system.shutdown()

View file

@ -0,0 +1,330 @@
/*
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.http.engine.client
import java.net.InetSocketAddress
import org.scalatest.Inside
import akka.util.ByteString
import akka.event.NoLogging
import akka.stream.FlowMaterializer
import akka.stream.testkit.{ AkkaSpec, StreamTestKit }
import akka.stream.scaladsl._
import akka.http.model.HttpEntity._
import akka.http.model.HttpMethods._
import akka.http.model._
import akka.http.model.headers._
import akka.http.util._
class HttpClientSpec extends AkkaSpec("akka.loggers = []\n akka.loglevel = OFF") with Inside {
implicit val materializer = FlowMaterializer()
"The client implementation" should {
"properly handle a request/response round-trip" which {
"has a request with empty entity" in new TestSetup {
requestsSub.sendNext(HttpRequest())
expectWireData(
"""GET / HTTP/1.1
|Host: example.com:80
|User-Agent: akka-http/test
|
|""")
netInSub.expectRequest(16)
sendWireData(
"""HTTP/1.1 200 OK
|Content-Length: 0
|
|""")
responsesSub.request(1)
responses.expectNext(HttpResponse())
requestsSub.sendComplete()
netOut.expectComplete()
netInSub.sendComplete()
responses.expectComplete()
}
"has a request with default entity" in new TestSetup {
val probe = StreamTestKit.PublisherProbe[ByteString]()
requestsSub.sendNext(HttpRequest(PUT, entity = HttpEntity(ContentTypes.`application/octet-stream`, 8, Source(probe))))
expectWireData(
"""PUT / HTTP/1.1
|Host: example.com:80
|User-Agent: akka-http/test
|Content-Type: application/octet-stream
|Content-Length: 8
|
|""")
val sub = probe.expectSubscription()
sub.expectRequest(4)
sub.sendNext(ByteString("ABC"))
expectWireData("ABC")
sub.sendNext(ByteString("DEF"))
expectWireData("DEF")
sub.sendNext(ByteString("XY"))
expectWireData("XY")
sub.sendComplete()
netInSub.expectRequest(16)
sendWireData(
"""HTTP/1.1 200 OK
|Content-Length: 0
|
|""")
responsesSub.request(1)
responses.expectNext(HttpResponse())
requestsSub.sendComplete()
netOut.expectComplete()
netInSub.sendComplete()
responses.expectComplete()
}
"has a response with a default entity" in new TestSetup {
requestsSub.sendNext(HttpRequest())
expectWireData(
"""GET / HTTP/1.1
|Host: example.com:80
|User-Agent: akka-http/test
|
|""")
netInSub.expectRequest(16)
sendWireData(
"""HTTP/1.1 200 OK
|Transfer-Encoding: chunked
|
|""")
responsesSub.request(1)
val HttpResponse(_, _, HttpEntity.Chunked(ct, chunks), _) = responses.expectNext()
ct shouldEqual ContentTypes.`application/octet-stream`
val probe = StreamTestKit.SubscriberProbe[ChunkStreamPart]()
chunks.runWith(Sink(probe))
val sub = probe.expectSubscription()
sendWireData("3\nABC\n")
sub.request(1)
probe.expectNext(HttpEntity.Chunk("ABC"))
sendWireData("4\nDEFX\n")
sub.request(1)
probe.expectNext(HttpEntity.Chunk("DEFX"))
sendWireData("0\n\n")
sub.request(1)
probe.expectNext(HttpEntity.LastChunk)
probe.expectComplete()
requestsSub.sendComplete()
netOut.expectComplete()
netInSub.sendComplete()
responses.expectComplete()
}
"exhibits eager request stream completion" in new TestSetup {
requestsSub.sendNext(HttpRequest())
requestsSub.sendComplete()
expectWireData(
"""GET / HTTP/1.1
|Host: example.com:80
|User-Agent: akka-http/test
|
|""")
netInSub.expectRequest(16)
sendWireData(
"""HTTP/1.1 200 OK
|Content-Length: 0
|
|""")
responsesSub.request(1)
responses.expectNext(HttpResponse())
netOut.expectComplete()
netInSub.sendComplete()
responses.expectComplete()
}
}
"produce proper errors" which {
"catch the entity stream being shorter than the Content-Length" in new TestSetup {
val probe = StreamTestKit.PublisherProbe[ByteString]()
requestsSub.sendNext(HttpRequest(PUT, entity = HttpEntity(ContentTypes.`application/octet-stream`, 8, Source(probe))))
expectWireData(
"""PUT / HTTP/1.1
|Host: example.com:80
|User-Agent: akka-http/test
|Content-Type: application/octet-stream
|Content-Length: 8
|
|""")
val sub = probe.expectSubscription()
sub.expectRequest(4)
sub.sendNext(ByteString("ABC"))
expectWireData("ABC")
sub.sendNext(ByteString("DEF"))
expectWireData("DEF")
sub.sendComplete()
val InvalidContentLengthException(info) = netOut.expectError()
info.summary shouldEqual "HTTP message had declared Content-Length 8 but entity data stream amounts to 2 bytes less"
netInSub.sendComplete()
responses.expectComplete()
netInSub.expectCancellation()
}
"catch the entity stream being longer than the Content-Length" in new TestSetup {
val probe = StreamTestKit.PublisherProbe[ByteString]()
requestsSub.sendNext(HttpRequest(PUT, entity = HttpEntity(ContentTypes.`application/octet-stream`, 8, Source(probe))))
expectWireData(
"""PUT / HTTP/1.1
|Host: example.com:80
|User-Agent: akka-http/test
|Content-Type: application/octet-stream
|Content-Length: 8
|
|""")
val sub = probe.expectSubscription()
sub.expectRequest(4)
sub.sendNext(ByteString("ABC"))
expectWireData("ABC")
sub.sendNext(ByteString("DEF"))
expectWireData("DEF")
sub.sendNext(ByteString("XYZ"))
val InvalidContentLengthException(info) = netOut.expectError()
info.summary shouldEqual "HTTP message had declared Content-Length 8 but entity data stream amounts to more bytes"
netInSub.sendComplete()
responses.expectComplete()
netInSub.expectCancellation()
}
"catch illegal response starts" in new TestSetup {
requestsSub.sendNext(HttpRequest())
expectWireData(
"""GET / HTTP/1.1
|Host: example.com:80
|User-Agent: akka-http/test
|
|""")
netInSub.expectRequest(16)
sendWireData(
"""HTTP/1.2 200 OK
|
|""")
val error @ IllegalResponseException(info) = responses.expectError()
info.summary shouldEqual "The server-side HTTP version is not supported"
netOut.expectError(error)
requestsSub.expectCancellation()
}
"catch illegal response chunks" in new TestSetup {
requestsSub.sendNext(HttpRequest())
expectWireData(
"""GET / HTTP/1.1
|Host: example.com:80
|User-Agent: akka-http/test
|
|""")
netInSub.expectRequest(16)
sendWireData(
"""HTTP/1.1 200 OK
|Transfer-Encoding: chunked
|
|""")
responsesSub.request(1)
val HttpResponse(_, _, HttpEntity.Chunked(ct, chunks), _) = responses.expectNext()
ct shouldEqual ContentTypes.`application/octet-stream`
val probe = StreamTestKit.SubscriberProbe[ChunkStreamPart]()
chunks.runWith(Sink(probe))
val sub = probe.expectSubscription()
sendWireData("3\nABC\n")
sub.request(1)
probe.expectNext(HttpEntity.Chunk("ABC"))
sendWireData("4\nDEFXX")
sub.request(1)
val error @ EntityStreamException(info) = probe.expectError()
info.summary shouldEqual "Illegal chunk termination"
responses.expectComplete()
netOut.expectComplete()
requestsSub.expectCancellation()
}
"catch a response start truncation" in new TestSetup {
requestsSub.sendNext(HttpRequest())
expectWireData(
"""GET / HTTP/1.1
|Host: example.com:80
|User-Agent: akka-http/test
|
|""")
netInSub.expectRequest(16)
sendWireData("HTTP/1.1 200 OK")
netInSub.sendComplete()
val error @ IllegalResponseException(info) = responses.expectError()
info.summary shouldEqual "Illegal HTTP message start"
netOut.expectError(error)
requestsSub.expectCancellation()
}
}
}
class TestSetup {
val requests = StreamTestKit.PublisherProbe[HttpRequest]
val responses = StreamTestKit.SubscriberProbe[HttpResponse]
val remoteAddress = new InetSocketAddress("example.com", 80)
def settings = ClientConnectionSettings(system)
.copy(userAgentHeader = Some(`User-Agent`(List(ProductVersion("akka-http", "test")))))
val (netOut, netIn) = {
val netOut = StreamTestKit.SubscriberProbe[ByteString]
val netIn = StreamTestKit.PublisherProbe[ByteString]
val clientFlow = HttpClient.transportToConnectionClientFlow(
Flow(Sink(netOut), Source(netIn)), remoteAddress, settings, NoLogging)
Source(requests).via(clientFlow).runWith(Sink(responses))
netOut -> netIn
}
def wipeDate(string: String) =
string.fastSplit('\n').map {
case s if s.startsWith("Date:") "Date: XXXX\r"
case s s
}.mkString("\n")
val netInSub = netIn.expectSubscription()
val netOutSub = netOut.expectSubscription()
val requestsSub = requests.expectSubscription()
val responsesSub = responses.expectSubscription()
def sendWireData(data: String): Unit = sendWireData(ByteString(data.stripMarginWithNewline("\r\n"), "ASCII"))
def sendWireData(data: ByteString): Unit = netInSub.sendNext(data)
def expectWireData(s: String) = {
netOutSub.request(1)
netOut.expectNext().utf8String shouldEqual s.stripMarginWithNewline("\r\n")
}
def closeNetworkInput(): Unit = netInSub.sendComplete()
}
}

View file

@ -120,8 +120,9 @@ class RequestParserSpec extends FreeSpec with Matchers with BeforeAndAfterAll {
|Host: x
|
|ABCDPATCH"""
}.toCharArray.map(_.toString).toSeq should rawMultiParseTo(
HttpRequest(PUT, "/resource/yes", List(Host("x")), "ABCD".getBytes))
}.toCharArray.map(_.toString).toSeq should generalRawMultiParseTo(
Right(HttpRequest(PUT, "/resource/yes", List(Host("x")), "ABCD".getBytes)),
Left(MessageStartError(400, ErrorInfo("Illegal HTTP message start"))))
closeAfterResponseCompletion shouldEqual Seq(false)
}
@ -232,7 +233,7 @@ class RequestParserSpec extends FreeSpec with Matchers with BeforeAndAfterAll {
val parser = newParser
val result = multiParse(newParser)(Seq(prep(start + manyChunks)))
val HttpEntity.Chunked(_, chunks) = result.head.right.get.req.entity
val strictChunks = chunks.collectAll.awaitResult(awaitAtMost)
val strictChunks = chunks.grouped(100000).runWith(Sink.head).awaitResult(awaitAtMost)
strictChunks.size shouldEqual numChunks
}
}
@ -442,7 +443,7 @@ class RequestParserSpec extends FreeSpec with Matchers with BeforeAndAfterAll {
def multiParse(parser: HttpRequestParser)(input: Seq[String]): Seq[Either[RequestOutput, StrictEqualHttpRequest]] =
Source(input.toList)
.map(ByteString.apply)
.section(name("parser"))(_.transform(() parser))
.section(name("parser"))(_.transform(() parser.stage))
.splitWhen(x x.isInstanceOf[MessageStart] || x.isInstanceOf[EntityStreamError])
.headAndTail
.collect {
@ -461,7 +462,7 @@ class RequestParserSpec extends FreeSpec with Matchers with BeforeAndAfterAll {
}
.flatten(FlattenStrategy.concat)
.map(strictEqualify)
.collectAll
.grouped(100000).runWith(Sink.head)
.awaitResult(awaitAtMost)
protected def parserSettings: ParserSettings = ParserSettings(system)
@ -474,7 +475,7 @@ class RequestParserSpec extends FreeSpec with Matchers with BeforeAndAfterAll {
}
private def compactEntityChunks(data: Source[ChunkStreamPart]): Future[Seq[ChunkStreamPart]] =
data.collectAll
data.grouped(100000).runWith(Sink.head)
.fast.recover { case _: NoSuchElementException Nil }
def prep(response: String) = response.stripMarginWithNewline("\r\n")

View file

@ -261,7 +261,7 @@ class ResponseParserSpec extends FreeSpec with Matchers with BeforeAndAfterAll {
val future =
Source(input.toList)
.map(ByteString.apply)
.section(name("parser"))(_.transform(() newParser(requestMethod)))
.section(name("parser"))(_.transform(() newParserStage(requestMethod)))
.splitWhen(x x.isInstanceOf[MessageStart] || x.isInstanceOf[EntityStreamError])
.headAndTail
.collect {
@ -279,14 +279,16 @@ class ResponseParserSpec extends FreeSpec with Matchers with BeforeAndAfterAll {
}
.flatten(FlattenStrategy.concat)
.map(strictEqualify)
.grouped(1000).runWith(Sink.head)
.grouped(100000).runWith(Sink.head)
Await.result(future, 500.millis)
}
def parserSettings: ParserSettings = ParserSettings(system)
def newParser(requestMethod: HttpMethod = GET) = {
val parser = new HttpResponseParser(parserSettings, HttpHeaderParser(parserSettings)(), () requestMethod)
parser
def newParserStage(requestMethod: HttpMethod = GET) = {
val parser = new HttpResponseParser(parserSettings, HttpHeaderParser(parserSettings)())
parser.setRequestMethodForNextResponse(requestMethod)
parser.stage
}
private def compactEntity(entity: ResponseEntity): Future[ResponseEntity] =
@ -296,7 +298,7 @@ class ResponseParserSpec extends FreeSpec with Matchers with BeforeAndAfterAll {
}
private def compactEntityChunks(data: Source[ChunkStreamPart]): Future[Source[ChunkStreamPart]] =
data.grouped(1000).runWith(Sink.head)
data.grouped(100000).runWith(Sink.head)
.fast.map(source(_: _*))
.fast.recover { case _: NoSuchElementException source() }

View file

@ -4,8 +4,10 @@
package akka.http.engine.server
import scala.util.Random
import scala.annotation.tailrec
import scala.concurrent.duration._
import org.scalatest.{ Inside, BeforeAndAfterAll, Matchers }
import org.scalatest.Inside
import akka.event.NoLogging
import akka.util.ByteString
import akka.stream.scaladsl._
@ -18,7 +20,7 @@ import HttpEntity._
import MediaTypes._
import HttpMethods._
class HttpServerSpec extends AkkaSpec with Matchers with BeforeAndAfterAll with Inside {
class HttpServerSpec extends AkkaSpec("akka.loggers = []\n akka.loglevel = OFF") with Inside {
implicit val materializer = FlowMaterializer()
"The server implementation" should {
@ -607,6 +609,45 @@ class HttpServerSpec extends AkkaSpec with Matchers with BeforeAndAfterAll with
|
|""".stripMarginWithNewline("\r\n")
}
"correctly consume and render large requests and responses" in new TestSetup {
send("""POST / HTTP/1.1
|Host: example.com
|Content-Length: 100000
|
|""".stripMarginWithNewline("\r\n"))
val HttpRequest(POST, _, _, entity, _) = expectRequest
responsesSub.expectRequest()
responsesSub.sendNext(HttpResponse(entity = entity))
responsesSub.sendComplete()
netOutSub.request(1)
wipeDate(netOut.expectNext().utf8String) shouldEqual
"""HTTP/1.1 200 OK
|Server: akka-http/test
|Date: XXXX
|Content-Type: application/octet-stream
|Content-Length: 100000
|
|""".stripMarginWithNewline("\r\n")
val random = new Random()
@tailrec def rec(bytesLeft: Int): Unit =
if (bytesLeft > 0) {
val count = math.min(random.nextInt(1000) + 1, bytesLeft)
val data = random.alphanumeric.take(count).mkString
send(data)
netOutSub.request(1)
netOut.expectNext().utf8String shouldEqual data
rec(bytesLeft - count)
}
rec(100000)
netInSub.sendComplete()
requests.expectComplete()
netOut.expectComplete()
}
}
class TestSetup {
@ -641,7 +682,7 @@ class HttpServerSpec extends AkkaSpec with Matchers with BeforeAndAfterAll with
def expectNoRequest(max: FiniteDuration): Unit = requests.expectNoMsg(max)
def send(data: ByteString): Unit = netInSub.sendNext(data)
def send(data: String): Unit = send(ByteString(data, "ASCII"))
def send(data: String): Unit = send(ByteString(data, "UTF8"))
def closeNetworkInput(): Unit = netInSub.sendComplete()
}

View file

@ -143,7 +143,7 @@ class UriSpec extends WordSpec with Matchers {
"not accept illegal IPv6 literals" in {
// 5 char quad
the[IllegalUriException] thrownBy Host("[::12345]") shouldBe {
new IllegalUriException("Illegal URI host: Invalid input '5', expected !HEXDIG, ':' or ']' (line 1, column 8)",
IllegalUriException("Illegal URI host: Invalid input '5', expected !HEXDIG, ':' or ']' (line 1, column 8)",
"[::12345]\n" +
" ^")
}
@ -443,42 +443,42 @@ class UriSpec extends WordSpec with Matchers {
"produce proper error messages for illegal URIs" in {
// illegal scheme
the[IllegalUriException] thrownBy Uri("foö:/a") shouldBe {
new IllegalUriException("Illegal URI reference: Invalid input 'ö', expected scheme-char, ':', path-segment-char, '%', '/', '?', '#' or 'EOI' (line 1, column 3)",
IllegalUriException("Illegal URI reference: Invalid input 'ö', expected scheme-char, ':', path-segment-char, '%', '/', '?', '#' or 'EOI' (line 1, column 3)",
"foö:/a\n" +
" ^")
}
// illegal userinfo
the[IllegalUriException] thrownBy Uri("http://user:ö@host") shouldBe {
new IllegalUriException("Illegal URI reference: Invalid input 'ö', expected userinfo-char, '%', '@' or DIGIT (line 1, column 13)",
IllegalUriException("Illegal URI reference: Invalid input 'ö', expected userinfo-char, '%', '@' or DIGIT (line 1, column 13)",
"http://user:ö@host\n" +
" ^")
}
// illegal percent-encoding
the[IllegalUriException] thrownBy Uri("http://use%2G@host") shouldBe {
new IllegalUriException("Illegal URI reference: Invalid input 'G', expected HEXDIG (line 1, column 13)",
IllegalUriException("Illegal URI reference: Invalid input 'G', expected HEXDIG (line 1, column 13)",
"http://use%2G@host\n" +
" ^")
}
// illegal path
the[IllegalUriException] thrownBy Uri("http://www.example.com/name with spaces/") shouldBe {
new IllegalUriException("Illegal URI reference: Invalid input ' ', expected path-segment-char, '%', '/', '?', '#' or 'EOI' (line 1, column 28)",
IllegalUriException("Illegal URI reference: Invalid input ' ', expected path-segment-char, '%', '/', '?', '#' or 'EOI' (line 1, column 28)",
"http://www.example.com/name with spaces/\n" +
" ^")
}
// illegal path with control character
the[IllegalUriException] thrownBy Uri("http:///with\newline") shouldBe {
new IllegalUriException("Illegal URI reference: Invalid input '\\n', expected path-segment-char, '%', '/', '?', '#' or 'EOI' (line 1, column 13)",
IllegalUriException("Illegal URI reference: Invalid input '\\n', expected path-segment-char, '%', '/', '?', '#' or 'EOI' (line 1, column 13)",
"http:///with\n" +
" ^")
}
// illegal query
the[IllegalUriException] thrownBy Uri("?a=b=c") shouldBe {
new IllegalUriException("Illegal URI reference: Invalid input '=', expected '+', query-char, '%', '&', '#' or 'EOI' (line 1, column 5)",
IllegalUriException("Illegal URI reference: Invalid input '=', expected '+', query-char, '%', '&', '#' or 'EOI' (line 1, column 5)",
"?a=b=c\n" +
" ^")
}

View file

@ -96,6 +96,6 @@ trait RouteTestResultComponent {
failTest("Request was neither completed nor rejected within " + timeout)
private def awaitAllElements[T](data: Source[T]): immutable.Seq[T] =
data.collectAll.awaitResult(timeout)
data.grouped(100000).runWith(Sink.head).awaitResult(timeout)
}
}

View file

@ -59,7 +59,7 @@ class CodingDirectivesSpec extends RoutingSpec {
decodeRequest(Gzip) { echoRequestContent }
} ~> check {
status shouldEqual BadRequest
responseAs[String] shouldEqual "The request's encoding is corrupt:\nNot in GZIP format"
responseAs[String] shouldEqual "The request's encoding is corrupt"
}
}
"reject truncated gzip request content" in {
@ -67,7 +67,7 @@ class CodingDirectivesSpec extends RoutingSpec {
decodeRequest(Gzip) { echoRequestContent }
} ~> check {
status shouldEqual BadRequest
responseAs[String] shouldEqual "The request's encoding is corrupt:\nTruncated GZIP stream"
responseAs[String] shouldEqual "The request's encoding is corrupt"
}
}
"reject requests with content encoded with 'deflate'" in {
@ -397,7 +397,7 @@ class CodingDirectivesSpec extends RoutingSpec {
Post("/", helloGzipped) ~> `Content-Encoding`(deflate) ~>
decompressRequest() { echoRequestContent } ~> check {
status shouldEqual BadRequest
responseAs[String] shouldEqual "The request's encoding is corrupt:\nincorrect header check"
responseAs[String] shouldEqual "The request's encoding is corrupt"
}
}
}

View file

@ -5,17 +5,16 @@
package akka.http.server
package directives
import scala.concurrent.Await
import scala.concurrent.duration._
import akka.http.model.StatusCodes._
import akka.http.model._
import akka.http.model.headers._
import akka.http.util._
import akka.stream.scaladsl.Source
import akka.stream.scaladsl.{ Sink, Source }
import akka.util.ByteString
import org.scalatest.{ Inside, Inspectors }
import scala.concurrent.Await
import scala.concurrent.duration._
class RangeDirectivesSpec extends RoutingSpec with Inspectors with Inside {
lazy val wrs =
mapSettings(_.copy(rangeCountLimit = 10, rangeCoalescingThreshold = 1L)) &
@ -100,7 +99,7 @@ class RangeDirectivesSpec extends RoutingSpec with Inspectors with Inside {
wrs { complete("Some random and not super short entity.") }
} ~> check {
header[`Content-Range`] should be(None)
val parts = Await.result(responseAs[Multipart.ByteRanges].parts.collectAll, 1.second)
val parts = Await.result(responseAs[Multipart.ByteRanges].parts.grouped(1000).runWith(Sink.head), 1.second)
parts.size shouldEqual 2
inside(parts(0)) {
case Multipart.ByteRanges.BodyPart(range, entity, unit, headers)
@ -125,7 +124,7 @@ class RangeDirectivesSpec extends RoutingSpec with Inspectors with Inside {
wrs { complete(HttpEntity.Default(MediaTypes.`text/plain`, content.length, entityData())) }
} ~> check {
header[`Content-Range`] should be(None)
val parts = Await.result(responseAs[Multipart.ByteRanges].parts.collectAll, 1.second)
val parts = Await.result(responseAs[Multipart.ByteRanges].parts.grouped(1000).runWith(Sink.head), 1.second)
parts.size shouldEqual 2
}
}

View file

@ -27,15 +27,10 @@ object ExceptionHandler {
def default(settings: RoutingSettings)(implicit ec: ExecutionContext): ExceptionHandler =
apply(default = true) {
case e: IllegalRequestException ctx {
case IllegalRequestException(info, status) ctx {
ctx.log.warning("Illegal request {}\n\t{}\n\tCompleting with '{}' response",
ctx.request, e.getMessage, e.status)
ctx.complete(e.status, e.info.format(settings.verboseErrorMessages))
}
case e: RequestProcessingException ctx {
ctx.log.warning("Request {} could not be handled normally\n\t{}\n\tCompleting with '{}' response",
ctx.request, e.getMessage, e.status)
ctx.complete(e.status, e.info.format(settings.verboseErrorMessages))
ctx.request, info.formatPretty, status)
ctx.complete(status, info.format(settings.verboseErrorMessages))
}
case NonFatal(e) ctx {
ctx.log.error(e, "Error during processing of request {}", ctx.request)

View file

@ -59,9 +59,9 @@ trait CodingDirectives {
def applyDecoder =
mapRequest(decoder.decode(_).mapEntity(StreamUtils.mapEntityError {
case NonFatal(e)
new IllegalRequestException(
IllegalRequestException(
StatusCodes.BadRequest,
ErrorInfo(s"The request's encoding is corrupt:\n${e.getMessage}"))
ErrorInfo("The request's encoding is corrupt", e.getMessage))
}))
requestEntityEmpty | (

View file

@ -79,7 +79,7 @@ trait MultipartUnmarshallers {
case x throw new IllegalStateException("Unexpected entity type from strict BodyPartParser: " + x)
}
builder += createStrictBodyPart(entity, headers)
case ParseError(errorInfo) throw new ParsingException(errorInfo)
case ParseError(errorInfo) throw ParsingException(errorInfo)
case x throw new IllegalStateException(s"Unexpected BodyPartParser result $x in strict case")
}
createStrict(mediaType, builder.result())
@ -90,7 +90,7 @@ trait MultipartUnmarshallers {
.headAndTail
.collect {
case (BodyPartStart(headers, createEntity), entityParts) createBodyPart(createEntity(entityParts), headers)
case (ParseError(errorInfo), _) throw new ParsingException(errorInfo)
case (ParseError(errorInfo), _) throw ParsingException(errorInfo)
}
createStreamed(entity.contentType.mediaType.asInstanceOf[MultipartMediaType], bodyParts)
}

View file

@ -49,8 +49,8 @@ trait PredefinedFromEntityUnmarshallers extends MultipartUnmarshallers {
val query = Uri.Query(string, nioCharset)
FormData(query)
} catch {
case ex: IllegalUriException
throw new IllegalArgumentException(ex.info.formatPretty.replace("Query,", "form content,"))
case IllegalUriException(info)
throw new IllegalArgumentException(info.formatPretty.replace("Query,", "form content,"))
}
}
}

View file

@ -53,6 +53,7 @@ private[scaladsl] case class GraphFlow[-In, CIn, COut, +Out](
private[scaladsl] def prepend(pipe: SourcePipe[In]): GraphSource[COut, Out] = {
val b = new FlowGraphBuilder()
b.allowCycles() // FIXME: remove after #16571 is cleared
val (nIn, nOut) = remap(b)
b.attachSource(nIn, pipe.appendPipe(inPipe))
GraphSource(b.partialBuild(), nOut, outPipe)
@ -75,6 +76,7 @@ private[scaladsl] case class GraphFlow[-In, CIn, COut, +Out](
case pipe: Pipe[Out, T] copy(outPipe = outPipe.appendPipe(pipe))
case gFlow: GraphFlow[Out, _, _, T]
val (newGraph, nOut) = FlowGraphBuilder(graph) { b
b.allowCycles() // FIXME: remove after #16571 is cleared
val (oIn, oOut) = gFlow.remap(b)
b.connect(out, outPipe.via(gFlow.inPipe), oIn)
(b.partialBuild(), oOut)
@ -141,6 +143,7 @@ private[scaladsl] case class GraphSource[COut, +Out](graph: PartialFlowGraph, ou
case pipe: Pipe[Out, T] copy(outPipe = outPipe.appendPipe(pipe))
case gFlow: GraphFlow[Out, _, _, T]
val (newGraph, nOut) = FlowGraphBuilder(graph) { b
b.allowCycles() // FIXME: remove after #16571 is cleared
val (oIn, oOut) = gFlow.remap(b)
b.connect(out, outPipe.via(gFlow.inPipe), oIn)
(b.partialBuild(), oOut)