From 12f05b03336faac63d36e8b06d63fdfbbe4c5df2 Mon Sep 17 00:00:00 2001 From: Mathias Date: Tue, 16 Dec 2014 10:54:24 +0100 Subject: [PATCH 01/10] =htp mark Http.ServerBinding `sealed` --- akka-http-core/src/main/scala/akka/http/Http.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/akka-http-core/src/main/scala/akka/http/Http.scala b/akka-http-core/src/main/scala/akka/http/Http.scala index a108634900..ea56d67c3b 100644 --- a/akka-http-core/src/main/scala/akka/http/Http.scala +++ b/akka-http-core/src/main/scala/akka/http/Http.scala @@ -91,7 +91,7 @@ object Http extends ExtensionId[HttpExt] with ExtensionIdProvider { /** * Represents a prospective HTTP server binding. */ - trait ServerBinding { + sealed trait ServerBinding { /** * The local address of the endpoint bound by the materialization of the `connections` [[Source]] * whose [[MaterializedMap]] is passed as parameter. From 25dd541f29bfbf905268086d0a1121a73d101fd9 Mon Sep 17 00:00:00 2001 From: Mathias Date: Tue, 16 Dec 2014 11:00:41 +0100 Subject: [PATCH 02/10] =htc small improvements in `DateTime` --- .../main/scala/akka/http/util/DateTime.scala | 21 +++++++++++++------ 1 file changed, 15 insertions(+), 6 deletions(-) diff --git a/akka-http-core/src/main/scala/akka/http/util/DateTime.scala b/akka-http-core/src/main/scala/akka/http/util/DateTime.scala index 44c2e36cd8..fe8a574be3 100644 --- a/akka-http-core/src/main/scala/akka/http/util/DateTime.scala +++ b/akka-http-core/src/main/scala/akka/http/util/DateTime.scala @@ -136,15 +136,17 @@ object DateTime { // compute yearday from month/monthday val m = month - 1 - var d = (m % 7) * 30 + (m % 7 + 1) / 2 + day - val isLeap = ((year % 4 == 0) && !(year % 100 == 0)) || (year % 400 == 0) + val m7 = m % 7 + var d = m7 * 30 + ((m7 + 1) >> 1) + day + val isLeap = isLeapYear(year) if (m >= 7) d += 214 if (d >= 61) d -= 1 // skip non-existent Feb 30 if (!isLeap && (d >= 60)) d -= 1 // skip non-existent Feb 29 // convert year/yearday to days since Jan 1, 1970, 00:00:00 val y = year - 1 - d += y * 365 + y / 4 - y / 100 + y / 400 + val yd = y / 100 + d += y * 365 + (y >> 2) - yd + (yd >> 2) val dn = d - (1969 * 365 + 492 - 19 + 4) val c = (dn - 1) * 86400L + hour * 3600L + minute * 60L + second // seconds since Jan 1, 1970, 00:00:00 @@ -180,7 +182,7 @@ object DateTime { else { y += 100 * (d / 36524) d %= 36524 - y += 4 * (d / 1461) + y += (d / 1461) << 2 d %= 1461 if (d == 1460) { y += 3; d = 365 } // last year out of 4 is long else { @@ -189,14 +191,14 @@ object DateTime { } } - val isLeap = (((y & 0x03) == 0) && !(y % 100 == 0)) || (y % 400 == 0) + val isLeap = isLeapYear(y) // compute month/monthday from year/yearday if (!isLeap && (d >= 59)) d += 1 // skip non-existent Feb 29 if (d >= 60) d += 1 // skip non-existent Feb 30 val d214 = d % 214 val d214_61 = d214 % 61 - var mon = (d214 / 61) * 2 + d214_61 / 31 + var mon = ((d214 / 61) << 1) + d214_61 / 31 if (d > 213) mon += 7 d = d214_61 % 31 + 1 @@ -211,6 +213,13 @@ object DateTime { isLeapYear = isLeap) } + private def isLeapYear(year: Int): Boolean = + ((year & 0x03) == 0) && { + val q = year / 100 + val r = year % 100 + r != 0 || (q & 0x03) == 0 + } + /** * Creates a new `DateTime` instance for the current point in time. * Note that this implementation discards milliseconds (i.e. rounds down to full seconds). From cf7e62d9ee929b3d844e7e26658cc24c1e1d1fff Mon Sep 17 00:00:00 2001 From: Mathias Date: Thu, 18 Dec 2014 16:55:54 +0100 Subject: [PATCH 03/10] =str apply temporary work-around for #16571 --- .../src/main/scala/akka/stream/scaladsl/GraphFlow.scala | 3 +++ 1 file changed, 3 insertions(+) diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/GraphFlow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/GraphFlow.scala index 259d35864f..73c6247fc2 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/GraphFlow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/GraphFlow.scala @@ -53,6 +53,7 @@ private[scaladsl] case class GraphFlow[-In, CIn, COut, +Out]( private[scaladsl] def prepend(pipe: SourcePipe[In]): GraphSource[COut, Out] = { val b = new FlowGraphBuilder() + b.allowCycles() // FIXME: remove after #16571 is cleared val (nIn, nOut) = remap(b) b.attachSource(nIn, pipe.appendPipe(inPipe)) GraphSource(b.partialBuild(), nOut, outPipe) @@ -75,6 +76,7 @@ private[scaladsl] case class GraphFlow[-In, CIn, COut, +Out]( case pipe: Pipe[Out, T] ⇒ copy(outPipe = outPipe.appendPipe(pipe)) case gFlow: GraphFlow[Out, _, _, T] ⇒ val (newGraph, nOut) = FlowGraphBuilder(graph) { b ⇒ + b.allowCycles() // FIXME: remove after #16571 is cleared val (oIn, oOut) = gFlow.remap(b) b.connect(out, outPipe.via(gFlow.inPipe), oIn) (b.partialBuild(), oOut) @@ -141,6 +143,7 @@ private[scaladsl] case class GraphSource[COut, +Out](graph: PartialFlowGraph, ou case pipe: Pipe[Out, T] ⇒ copy(outPipe = outPipe.appendPipe(pipe)) case gFlow: GraphFlow[Out, _, _, T] ⇒ val (newGraph, nOut) = FlowGraphBuilder(graph) { b ⇒ + b.allowCycles() // FIXME: remove after #16571 is cleared val (oIn, oOut) = gFlow.remap(b) b.connect(out, outPipe.via(gFlow.inPipe), oIn) (b.partialBuild(), oOut) From 44a684c390a4e0c4895900b95c405c3c2e6e334d Mon Sep 17 00:00:00 2001 From: Mathias Date: Thu, 18 Dec 2014 17:02:39 +0100 Subject: [PATCH 04/10] !htc clean up and unify HTTP exception model --- .../src/main/scala/akka/http/Http.scala | 4 -- .../engine/parsing/HttpHeaderParser.scala | 2 +- .../http/engine/rendering/RenderSupport.scala | 4 +- .../scala/akka/http/model/ErrorInfo.scala | 51 +++++++++++-------- .../scala/akka/http/model/HttpMessage.scala | 4 +- .../scala/akka/http/model/Multipart.scala | 4 +- .../src/main/scala/akka/http/model/Uri.scala | 3 +- .../akka/http/model/parser/CommonRules.scala | 2 +- .../akka/http/model/parser/HeaderParser.scala | 4 +- .../akka/http/model/parser/LinkHeader.scala | 2 +- .../test/scala/akka/http/model/UriSpec.scala | 14 ++--- .../directives/CodingDirectivesSpec.scala | 6 +-- .../akka/http/server/ExceptionHandler.scala | 11 ++-- .../server/directives/CodingDirectives.scala | 4 +- .../MultipartUnmarshallers.scala | 4 +- .../PredefinedFromEntityUnmarshallers.scala | 4 +- 16 files changed, 61 insertions(+), 62 deletions(-) diff --git a/akka-http-core/src/main/scala/akka/http/Http.scala b/akka-http-core/src/main/scala/akka/http/Http.scala index ea56d67c3b..ce6c18de92 100644 --- a/akka-http-core/src/main/scala/akka/http/Http.scala +++ b/akka-http-core/src/main/scala/akka/http/Http.scala @@ -205,10 +205,6 @@ object Http extends ExtensionId[HttpExt] with ExtensionIdProvider { def flow: Flow[HttpRequest, HttpResponse] } - class RequestTimeoutException(val request: HttpRequest, message: String) extends RuntimeException(message) - - class StreamException(val info: ErrorInfo) extends RuntimeException(info.summary) - //////////////////// EXTENSION SETUP /////////////////// def apply()(implicit system: ActorSystem): HttpExt = super.apply(system) diff --git a/akka-http-core/src/main/scala/akka/http/engine/parsing/HttpHeaderParser.scala b/akka-http-core/src/main/scala/akka/http/engine/parsing/HttpHeaderParser.scala index f3dd992535..de6973576e 100644 --- a/akka-http-core/src/main/scala/akka/http/engine/parsing/HttpHeaderParser.scala +++ b/akka-http-core/src/main/scala/akka/http/engine/parsing/HttpHeaderParser.scala @@ -402,7 +402,7 @@ private[http] object HttpHeaderParser { "Cache-Control: no-cache", "Expect: 100-continue") - def apply(settings: HttpHeaderParser.Settings)(warnOnIllegalHeader: ErrorInfo ⇒ Unit = info ⇒ throw new IllegalHeaderException(info)) = + def apply(settings: HttpHeaderParser.Settings)(warnOnIllegalHeader: ErrorInfo ⇒ Unit = info ⇒ throw IllegalHeaderException(info)) = prime(unprimed(settings, warnOnIllegalHeader)) def unprimed(settings: HttpHeaderParser.Settings, warnOnIllegalHeader: ErrorInfo ⇒ Unit) = diff --git a/akka-http-core/src/main/scala/akka/http/engine/rendering/RenderSupport.scala b/akka-http-core/src/main/scala/akka/http/engine/rendering/RenderSupport.scala index 6b5c577449..baa990302c 100644 --- a/akka-http-core/src/main/scala/akka/http/engine/rendering/RenderSupport.scala +++ b/akka-http-core/src/main/scala/akka/http/engine/rendering/RenderSupport.scala @@ -75,13 +75,13 @@ private object RenderSupport { override def onPush(elem: ByteString, ctx: Context[ByteString]): Directive = { sent += elem.length if (sent > length) - throw new InvalidContentLengthException(s"HTTP message had declared Content-Length $length but entity data stream amounts to more bytes") + throw InvalidContentLengthException(s"HTTP message had declared Content-Length $length but entity data stream amounts to more bytes") ctx.push(elem) } override def onUpstreamFinish(ctx: Context[ByteString]): TerminationDirective = { if (sent < length) - throw new InvalidContentLengthException(s"HTTP message had declared Content-Length $length but entity data stream amounts to ${length - sent} bytes less") + throw InvalidContentLengthException(s"HTTP message had declared Content-Length $length but entity data stream amounts to ${length - sent} bytes less") ctx.finish() } diff --git a/akka-http-core/src/main/scala/akka/http/model/ErrorInfo.scala b/akka-http-core/src/main/scala/akka/http/model/ErrorInfo.scala index 13b5797a85..ff915e5de7 100644 --- a/akka-http-core/src/main/scala/akka/http/model/ErrorInfo.scala +++ b/akka-http-core/src/main/scala/akka/http/model/ErrorInfo.scala @@ -4,7 +4,7 @@ package akka.http.model -import StatusCodes.{ ClientError, ServerError } +import StatusCodes.ClientError /** * Two-level model of error information. @@ -33,34 +33,43 @@ object ErrorInfo { } /** Marker for exceptions that provide an ErrorInfo */ -abstract case class ExceptionWithErrorInfo(info: ErrorInfo) extends RuntimeException(info.formatPretty) +abstract class ExceptionWithErrorInfo(info: ErrorInfo) extends RuntimeException(info.formatPretty) -class IllegalUriException(info: ErrorInfo) extends ExceptionWithErrorInfo(info) { - def this(summary: String, detail: String = "") = this(ErrorInfo(summary, detail)) +case class IllegalUriException(info: ErrorInfo) extends ExceptionWithErrorInfo(info) +object IllegalUriException { + def apply(summary: String, detail: String = ""): IllegalUriException = apply(ErrorInfo(summary, detail)) } -class IllegalHeaderException(info: ErrorInfo) extends ExceptionWithErrorInfo(info) { - def this(summary: String, detail: String = "") = this(ErrorInfo(summary, detail)) +case class IllegalHeaderException(info: ErrorInfo) extends ExceptionWithErrorInfo(info) +object IllegalHeaderException { + def apply(summary: String, detail: String = ""): IllegalHeaderException = apply(ErrorInfo(summary, detail)) } -class InvalidContentLengthException(info: ErrorInfo) extends ExceptionWithErrorInfo(info) { - def this(summary: String, detail: String = "") = this(ErrorInfo(summary, detail)) +case class InvalidContentLengthException(info: ErrorInfo) extends ExceptionWithErrorInfo(info) +object InvalidContentLengthException { + def apply(summary: String, detail: String = ""): InvalidContentLengthException = apply(ErrorInfo(summary, detail)) } -class ParsingException(info: ErrorInfo) extends ExceptionWithErrorInfo(info) { - def this(summary: String, detail: String = "") = this(ErrorInfo(summary, detail)) +case class ParsingException(info: ErrorInfo) extends ExceptionWithErrorInfo(info) +object ParsingException { + def apply(summary: String, detail: String = ""): ParsingException = apply(ErrorInfo(summary, detail)) } -class IllegalRequestException private (info: ErrorInfo, val status: ClientError) - extends ExceptionWithErrorInfo(info) { - def this(status: ClientError) = this(ErrorInfo(status.defaultMessage), status) - def this(status: ClientError, info: ErrorInfo) = this(info.withFallbackSummary(status.defaultMessage), status) - def this(status: ClientError, detail: String) = this(ErrorInfo(status.defaultMessage, detail), status) +case class IllegalRequestException(info: ErrorInfo, status: ClientError) extends ExceptionWithErrorInfo(info) +object IllegalRequestException { + def apply(status: ClientError): IllegalRequestException = apply(ErrorInfo(status.defaultMessage), status) + def apply(status: ClientError, info: ErrorInfo): IllegalRequestException = apply(info.withFallbackSummary(status.defaultMessage), status) + def apply(status: ClientError, detail: String): IllegalRequestException = apply(ErrorInfo(status.defaultMessage, detail), status) } -class RequestProcessingException private (info: ErrorInfo, val status: ServerError) - extends ExceptionWithErrorInfo(info) { - def this(status: ServerError) = this(ErrorInfo(status.defaultMessage), status) - def this(status: ServerError, info: ErrorInfo) = this(info.withFallbackSummary(status.defaultMessage), status) - def this(status: ServerError, detail: String) = this(ErrorInfo(status.defaultMessage, detail), status) -} \ No newline at end of file +case class IllegalResponseException(info: ErrorInfo) extends ExceptionWithErrorInfo(info) +object IllegalResponseException { + def apply(summary: String, detail: String = ""): IllegalResponseException = apply(ErrorInfo(summary, detail)) +} + +case class EntityStreamException(info: ErrorInfo) extends ExceptionWithErrorInfo(info) +object EntityStreamException { + def apply(summary: String, detail: String = ""): EntityStreamException = apply(ErrorInfo(summary, detail)) +} + +case class RequestTimeoutException(request: HttpRequest, message: String) extends RuntimeException(message) diff --git a/akka-http-core/src/main/scala/akka/http/model/HttpMessage.scala b/akka-http-core/src/main/scala/akka/http/model/HttpMessage.scala index 50cd953837..8aa9ef7ea1 100644 --- a/akka-http-core/src/main/scala/akka/http/model/HttpMessage.scala +++ b/akka-http-core/src/main/scala/akka/http/model/HttpMessage.scala @@ -270,7 +270,7 @@ object HttpRequest { val hostHeader = headers.collectFirst { case x: Host ⇒ x } if (uri.isRelative) { def fail(detail: String) = - throw new IllegalUriException(s"Cannot establish effective URI of request to `$uri`, request has a relative URI and $detail") + throw IllegalUriException(s"Cannot establish effective URI of request to `$uri`, request has a relative URI and $detail") val Host(host, port) = hostHeader match { case None ⇒ if (defaultHostHeader.isEmpty) fail("is missing a `Host` header") else defaultHostHeader case Some(x) if x.isEmpty ⇒ if (defaultHostHeader.isEmpty) fail("an empty `Host` header") else defaultHostHeader @@ -280,7 +280,7 @@ object HttpRequest { } else // http://tools.ietf.org/html/rfc7230#section-5.4 if (hostHeader.isEmpty || uri.authority.isEmpty && hostHeader.get.isEmpty || hostHeader.get.host.equalsIgnoreCase(uri.authority.host)) uri - else throw new IllegalUriException(s"'Host' header value of request to `$uri` doesn't match request target authority", + else throw IllegalUriException(s"'Host' header value of request to `$uri` doesn't match request target authority", s"Host header: $hostHeader\nrequest target authority: ${uri.authority}") } diff --git a/akka-http-core/src/main/scala/akka/http/model/Multipart.scala b/akka-http-core/src/main/scala/akka/http/model/Multipart.scala index c37ade5304..84d0641915 100644 --- a/akka-http-core/src/main/scala/akka/http/model/Multipart.scala +++ b/akka-http-core/src/main/scala/akka/http/model/Multipart.scala @@ -108,13 +108,13 @@ object Multipart { val params = dispositionParams params.get("name") match { case Some(name) ⇒ Success(f(name, params - "name", headers.filterNot(_ is "content-disposition"))) - case None ⇒ Failure(new IllegalHeaderException("multipart/form-data part must contain `Content-Disposition` header with `name` parameter")) + case None ⇒ Failure(IllegalHeaderException("multipart/form-data part must contain `Content-Disposition` header with `name` parameter")) } } private[BodyPart] def tryCreateByteRangesBodyPart[T](f: (ContentRange, RangeUnit, immutable.Seq[HttpHeader]) ⇒ T): Try[T] = headers.collectFirst { case x: `Content-Range` ⇒ x } match { case Some(`Content-Range`(unit, range)) ⇒ Success(f(range, unit, headers.filterNot(_ is "content-range"))) - case None ⇒ Failure(new IllegalHeaderException("multipart/byteranges part must contain `Content-Range` header")) + case None ⇒ Failure(IllegalHeaderException("multipart/byteranges part must contain `Content-Range` header")) } } object BodyPart { diff --git a/akka-http-core/src/main/scala/akka/http/model/Uri.scala b/akka-http-core/src/main/scala/akka/http/model/Uri.scala index 3dd52a7444..54100256c5 100644 --- a/akka-http-core/src/main/scala/akka/http/model/Uri.scala +++ b/akka-http-core/src/main/scala/akka/http/model/Uri.scala @@ -723,8 +723,7 @@ object Uri { if (hasDotOrDotDotSegment(path)) process(path) else path } - private[http] def fail(summary: String, detail: String = "") = - throw new IllegalUriException(summary, detail) + private[http] def fail(summary: String, detail: String = "") = throw IllegalUriException(summary, detail) private[http] def create(scheme: String, userinfo: String, host: Host, port: Int, path: Path, query: Query, fragment: Option[String]): Uri = diff --git a/akka-http-core/src/main/scala/akka/http/model/parser/CommonRules.scala b/akka-http-core/src/main/scala/akka/http/model/parser/CommonRules.scala index d9ed5dcd30..3a037546af 100644 --- a/akka-http-core/src/main/scala/akka/http/model/parser/CommonRules.scala +++ b/akka-http-core/src/main/scala/akka/http/model/parser/CommonRules.scala @@ -401,7 +401,7 @@ private[parser] trait CommonRules { this: Parser with StringBuilding ⇒ private def createDateTime(year: Int, month: Int, day: Int, hour: Int, min: Int, sec: Int, wkday: Int) = { val dt = DateTime(year, month, day, hour, min, sec) if (dt.weekday != wkday) - throw new ParsingException(s"Illegal weekday in date $dt: is '${DateTime.weekday(wkday)}' but " + + throw ParsingException(s"Illegal weekday in date $dt: is '${DateTime.weekday(wkday)}' but " + s"should be '${DateTime.weekday(dt.weekday)}'") dt } diff --git a/akka-http-core/src/main/scala/akka/http/model/parser/HeaderParser.scala b/akka-http-core/src/main/scala/akka/http/model/parser/HeaderParser.scala index b56ab0e4ed..2c9597e911 100644 --- a/akka-http-core/src/main/scala/akka/http/model/parser/HeaderParser.scala +++ b/akka-http-core/src/main/scala/akka/http/model/parser/HeaderParser.scala @@ -37,8 +37,8 @@ private[http] class HeaderParser(val input: ParserInput) extends Parser with Dyn def success(result: HttpHeader :: HNil): Result = Right(result.head) def parseError(error: ParseError): Result = Left(errorInfo(error)) def failure(error: Throwable): Result = error match { - case e: IllegalUriException ⇒ Left(e.info) - case NonFatal(e) ⇒ Left(ErrorInfo.fromCompoundString(e.getMessage)) + case IllegalUriException(info) ⇒ Left(info) + case NonFatal(e) ⇒ Left(ErrorInfo.fromCompoundString(e.getMessage)) } def ruleNotFound(ruleName: String): Result = throw HeaderParser.RuleNotFoundException } diff --git a/akka-http-core/src/main/scala/akka/http/model/parser/LinkHeader.scala b/akka-http-core/src/main/scala/akka/http/model/parser/LinkHeader.scala index 2ebdf46738..c59a7be14a 100644 --- a/akka-http-core/src/main/scala/akka/http/model/parser/LinkHeader.scala +++ b/akka-http-core/src/main/scala/akka/http/model/parser/LinkHeader.scala @@ -56,7 +56,7 @@ private[parser] trait LinkHeader { this: Parser with CommonRules with CommonActi capture(oneOrMore(!'"' ~ !';' ~ !',' ~ VCHAR)) ~> { s ⇒ try new UriParser(s).parseUriReference() catch { - case e: IllegalUriException ⇒ throw new ParsingException(e.info.withSummaryPrepended("Illegal `Link` header relation-type")) + case IllegalUriException(info) ⇒ throw ParsingException(info.withSummaryPrepended("Illegal `Link` header relation-type")) } s } diff --git a/akka-http-core/src/test/scala/akka/http/model/UriSpec.scala b/akka-http-core/src/test/scala/akka/http/model/UriSpec.scala index 2aa5164e22..7e4bd23232 100644 --- a/akka-http-core/src/test/scala/akka/http/model/UriSpec.scala +++ b/akka-http-core/src/test/scala/akka/http/model/UriSpec.scala @@ -143,7 +143,7 @@ class UriSpec extends WordSpec with Matchers { "not accept illegal IPv6 literals" in { // 5 char quad the[IllegalUriException] thrownBy Host("[::12345]") shouldBe { - new IllegalUriException("Illegal URI host: Invalid input '5', expected !HEXDIG, ':' or ']' (line 1, column 8)", + IllegalUriException("Illegal URI host: Invalid input '5', expected !HEXDIG, ':' or ']' (line 1, column 8)", "[::12345]\n" + " ^") } @@ -443,42 +443,42 @@ class UriSpec extends WordSpec with Matchers { "produce proper error messages for illegal URIs" in { // illegal scheme the[IllegalUriException] thrownBy Uri("foö:/a") shouldBe { - new IllegalUriException("Illegal URI reference: Invalid input 'ö', expected scheme-char, ':', path-segment-char, '%', '/', '?', '#' or 'EOI' (line 1, column 3)", + IllegalUriException("Illegal URI reference: Invalid input 'ö', expected scheme-char, ':', path-segment-char, '%', '/', '?', '#' or 'EOI' (line 1, column 3)", "foö:/a\n" + " ^") } // illegal userinfo the[IllegalUriException] thrownBy Uri("http://user:ö@host") shouldBe { - new IllegalUriException("Illegal URI reference: Invalid input 'ö', expected userinfo-char, '%', '@' or DIGIT (line 1, column 13)", + IllegalUriException("Illegal URI reference: Invalid input 'ö', expected userinfo-char, '%', '@' or DIGIT (line 1, column 13)", "http://user:ö@host\n" + " ^") } // illegal percent-encoding the[IllegalUriException] thrownBy Uri("http://use%2G@host") shouldBe { - new IllegalUriException("Illegal URI reference: Invalid input 'G', expected HEXDIG (line 1, column 13)", + IllegalUriException("Illegal URI reference: Invalid input 'G', expected HEXDIG (line 1, column 13)", "http://use%2G@host\n" + " ^") } // illegal path the[IllegalUriException] thrownBy Uri("http://www.example.com/name with spaces/") shouldBe { - new IllegalUriException("Illegal URI reference: Invalid input ' ', expected path-segment-char, '%', '/', '?', '#' or 'EOI' (line 1, column 28)", + IllegalUriException("Illegal URI reference: Invalid input ' ', expected path-segment-char, '%', '/', '?', '#' or 'EOI' (line 1, column 28)", "http://www.example.com/name with spaces/\n" + " ^") } // illegal path with control character the[IllegalUriException] thrownBy Uri("http:///with\newline") shouldBe { - new IllegalUriException("Illegal URI reference: Invalid input '\\n', expected path-segment-char, '%', '/', '?', '#' or 'EOI' (line 1, column 13)", + IllegalUriException("Illegal URI reference: Invalid input '\\n', expected path-segment-char, '%', '/', '?', '#' or 'EOI' (line 1, column 13)", "http:///with\n" + " ^") } // illegal query the[IllegalUriException] thrownBy Uri("?a=b=c") shouldBe { - new IllegalUriException("Illegal URI reference: Invalid input '=', expected '+', query-char, '%', '&', '#' or 'EOI' (line 1, column 5)", + IllegalUriException("Illegal URI reference: Invalid input '=', expected '+', query-char, '%', '&', '#' or 'EOI' (line 1, column 5)", "?a=b=c\n" + " ^") } diff --git a/akka-http-tests/src/test/scala/akka/http/server/directives/CodingDirectivesSpec.scala b/akka-http-tests/src/test/scala/akka/http/server/directives/CodingDirectivesSpec.scala index 6ce7d6e952..1d92623e38 100644 --- a/akka-http-tests/src/test/scala/akka/http/server/directives/CodingDirectivesSpec.scala +++ b/akka-http-tests/src/test/scala/akka/http/server/directives/CodingDirectivesSpec.scala @@ -59,7 +59,7 @@ class CodingDirectivesSpec extends RoutingSpec { decodeRequest(Gzip) { echoRequestContent } } ~> check { status shouldEqual BadRequest - responseAs[String] shouldEqual "The request's encoding is corrupt:\nNot in GZIP format" + responseAs[String] shouldEqual "The request's encoding is corrupt" } } "reject truncated gzip request content" in { @@ -67,7 +67,7 @@ class CodingDirectivesSpec extends RoutingSpec { decodeRequest(Gzip) { echoRequestContent } } ~> check { status shouldEqual BadRequest - responseAs[String] shouldEqual "The request's encoding is corrupt:\nTruncated GZIP stream" + responseAs[String] shouldEqual "The request's encoding is corrupt" } } "reject requests with content encoded with 'deflate'" in { @@ -397,7 +397,7 @@ class CodingDirectivesSpec extends RoutingSpec { Post("/", helloGzipped) ~> `Content-Encoding`(deflate) ~> decompressRequest() { echoRequestContent } ~> check { status shouldEqual BadRequest - responseAs[String] shouldEqual "The request's encoding is corrupt:\nincorrect header check" + responseAs[String] shouldEqual "The request's encoding is corrupt" } } } diff --git a/akka-http/src/main/scala/akka/http/server/ExceptionHandler.scala b/akka-http/src/main/scala/akka/http/server/ExceptionHandler.scala index 6178848f60..f34f367154 100644 --- a/akka-http/src/main/scala/akka/http/server/ExceptionHandler.scala +++ b/akka-http/src/main/scala/akka/http/server/ExceptionHandler.scala @@ -27,15 +27,10 @@ object ExceptionHandler { def default(settings: RoutingSettings)(implicit ec: ExecutionContext): ExceptionHandler = apply(default = true) { - case e: IllegalRequestException ⇒ ctx ⇒ { + case IllegalRequestException(info, status) ⇒ ctx ⇒ { ctx.log.warning("Illegal request {}\n\t{}\n\tCompleting with '{}' response", - ctx.request, e.getMessage, e.status) - ctx.complete(e.status, e.info.format(settings.verboseErrorMessages)) - } - case e: RequestProcessingException ⇒ ctx ⇒ { - ctx.log.warning("Request {} could not be handled normally\n\t{}\n\tCompleting with '{}' response", - ctx.request, e.getMessage, e.status) - ctx.complete(e.status, e.info.format(settings.verboseErrorMessages)) + ctx.request, info.formatPretty, status) + ctx.complete(status, info.format(settings.verboseErrorMessages)) } case NonFatal(e) ⇒ ctx ⇒ { ctx.log.error(e, "Error during processing of request {}", ctx.request) diff --git a/akka-http/src/main/scala/akka/http/server/directives/CodingDirectives.scala b/akka-http/src/main/scala/akka/http/server/directives/CodingDirectives.scala index d63f30f693..ccd1d44cab 100644 --- a/akka-http/src/main/scala/akka/http/server/directives/CodingDirectives.scala +++ b/akka-http/src/main/scala/akka/http/server/directives/CodingDirectives.scala @@ -59,9 +59,9 @@ trait CodingDirectives { def applyDecoder = mapRequest(decoder.decode(_).mapEntity(StreamUtils.mapEntityError { case NonFatal(e) ⇒ - new IllegalRequestException( + IllegalRequestException( StatusCodes.BadRequest, - ErrorInfo(s"The request's encoding is corrupt:\n${e.getMessage}")) + ErrorInfo("The request's encoding is corrupt", e.getMessage)) })) requestEntityEmpty | ( diff --git a/akka-http/src/main/scala/akka/http/unmarshalling/MultipartUnmarshallers.scala b/akka-http/src/main/scala/akka/http/unmarshalling/MultipartUnmarshallers.scala index d77407ba52..3539b895e7 100644 --- a/akka-http/src/main/scala/akka/http/unmarshalling/MultipartUnmarshallers.scala +++ b/akka-http/src/main/scala/akka/http/unmarshalling/MultipartUnmarshallers.scala @@ -79,7 +79,7 @@ trait MultipartUnmarshallers { case x ⇒ throw new IllegalStateException("Unexpected entity type from strict BodyPartParser: " + x) } builder += createStrictBodyPart(entity, headers) - case ParseError(errorInfo) ⇒ throw new ParsingException(errorInfo) + case ParseError(errorInfo) ⇒ throw ParsingException(errorInfo) case x ⇒ throw new IllegalStateException(s"Unexpected BodyPartParser result $x in strict case") } createStrict(mediaType, builder.result()) @@ -90,7 +90,7 @@ trait MultipartUnmarshallers { .headAndTail .collect { case (BodyPartStart(headers, createEntity), entityParts) ⇒ createBodyPart(createEntity(entityParts), headers) - case (ParseError(errorInfo), _) ⇒ throw new ParsingException(errorInfo) + case (ParseError(errorInfo), _) ⇒ throw ParsingException(errorInfo) } createStreamed(entity.contentType.mediaType.asInstanceOf[MultipartMediaType], bodyParts) } diff --git a/akka-http/src/main/scala/akka/http/unmarshalling/PredefinedFromEntityUnmarshallers.scala b/akka-http/src/main/scala/akka/http/unmarshalling/PredefinedFromEntityUnmarshallers.scala index 614ead66b1..fed5fcbd1a 100644 --- a/akka-http/src/main/scala/akka/http/unmarshalling/PredefinedFromEntityUnmarshallers.scala +++ b/akka-http/src/main/scala/akka/http/unmarshalling/PredefinedFromEntityUnmarshallers.scala @@ -49,8 +49,8 @@ trait PredefinedFromEntityUnmarshallers extends MultipartUnmarshallers { val query = Uri.Query(string, nioCharset) FormData(query) } catch { - case ex: IllegalUriException ⇒ - throw new IllegalArgumentException(ex.info.formatPretty.replace("Query,", "form content,")) + case IllegalUriException(info) ⇒ + throw new IllegalArgumentException(info.formatPretty.replace("Query,", "form content,")) } } } From 4d907bfb50f89f83b94ff8a1c707bbec0a7be4e5 Mon Sep 17 00:00:00 2001 From: Mathias Date: Thu, 18 Dec 2014 17:04:45 +0100 Subject: [PATCH 05/10] =htc small refactoring in OneHundredContinue support --- .../akka/http/engine/TokenSourceActor.scala | 35 ++++++++ .../akka/http/engine/server/HttpServer.scala | 88 +++++++++++++------ .../engine/server/OneHundredContinue.scala | 66 -------------- 3 files changed, 96 insertions(+), 93 deletions(-) create mode 100644 akka-http-core/src/main/scala/akka/http/engine/TokenSourceActor.scala delete mode 100644 akka-http-core/src/main/scala/akka/http/engine/server/OneHundredContinue.scala diff --git a/akka-http-core/src/main/scala/akka/http/engine/TokenSourceActor.scala b/akka-http-core/src/main/scala/akka/http/engine/TokenSourceActor.scala new file mode 100644 index 0000000000..8ffffec751 --- /dev/null +++ b/akka-http-core/src/main/scala/akka/http/engine/TokenSourceActor.scala @@ -0,0 +1,35 @@ +package akka.http.engine + +import scala.annotation.tailrec +import akka.stream.actor.{ ActorPublisherMessage, ActorPublisher } + +/** + * An actor publisher for producing a simple stream of singleton tokens + * the release of which is triggered by the reception of a [[TokenSourceActor.Trigger]] message. + */ +private[engine] class TokenSourceActor[T](token: T) extends ActorPublisher[T] { + private var triggered = 0 + + def receive = { + case TokenSourceActor.Trigger ⇒ + triggered += 1 + tryDispatch() + + case ActorPublisherMessage.Request(_) ⇒ + tryDispatch() + + case ActorPublisherMessage.Cancel ⇒ + context.stop(self) + } + + @tailrec private def tryDispatch(): Unit = + if (triggered > 0 && totalDemand > 0) { + onNext(token) + triggered -= 1 + tryDispatch() + } +} + +private[engine] object TokenSourceActor { + case object Trigger +} diff --git a/akka-http-core/src/main/scala/akka/http/engine/server/HttpServer.scala b/akka-http-core/src/main/scala/akka/http/engine/server/HttpServer.scala index dfcd54cfdf..ddb57ee22a 100644 --- a/akka-http-core/src/main/scala/akka/http/engine/server/HttpServer.scala +++ b/akka-http-core/src/main/scala/akka/http/engine/server/HttpServer.scala @@ -4,10 +4,10 @@ package akka.http.engine.server +import scala.util.control.NonFatal import akka.actor.{ ActorRef, Props } import akka.util.ByteString import akka.event.LoggingAdapter -import akka.stream.stage.PushPullStage import akka.stream.scaladsl.OperationAttributes._ import akka.stream.FlattenStrategy import akka.stream.scaladsl._ @@ -15,11 +15,9 @@ import akka.stream.stage.PushPullStage import akka.http.engine.parsing.{ HttpHeaderParser, HttpRequestParser } import akka.http.engine.rendering.{ ResponseRenderingContext, HttpResponseRendererFactory } import akka.http.engine.parsing.ParserOutput._ +import akka.http.engine.TokenSourceActor import akka.http.model._ import akka.http.util._ -import akka.http.Http - -import scala.util.control.NonFatal /** * INTERNAL API @@ -44,7 +42,7 @@ private[http] object HttpServer { @volatile var oneHundredContinueRef: Option[ActorRef] = None // FIXME: unnecessary after fixing #16168 val oneHundredContinueSource = Source[OneHundredContinue.type] { Props { - val actor = new OneHundredContinueSourceActor + val actor = new TokenSourceActor(OneHundredContinue) oneHundredContinueRef = Some(actor.context.self) actor } @@ -56,7 +54,7 @@ private[http] object HttpServer { val requestParsing = Flow[ByteString].section(name("rootParser"))(_.transform(() ⇒ // each connection uses a single (private) request parser instance for all its requests // which builds a cache of all header instances seen on that connection - rootParser.createShallowCopy(() ⇒ oneHundredContinueRef))) + rootParser.createShallowCopy(() ⇒ oneHundredContinueRef).stage)) val requestPreparation = Flow[RequestOutput] @@ -129,9 +127,9 @@ private[http] object HttpServer { override def initialCompletionHandling = CompletionHandling( onComplete = (ctx, _) ⇒ { ctx.complete(); SameState }, onError = { - case (ctx, _, error: Http.StreamException) ⇒ + case (ctx, _, EntityStreamException(errorInfo)) ⇒ // the application has forwarded a request entity stream error to the response stream - finishWithError(ctx, "request", StatusCodes.BadRequest, error.info) + finishWithError(ctx, "request", StatusCodes.BadRequest, errorInfo) case (ctx, _, error) ⇒ ctx.error(error) SameState @@ -150,27 +148,63 @@ private[http] object HttpServer { } } } -} -private[server] class ErrorsTo500ResponseRecovery(log: LoggingAdapter) - extends PushPullStage[ResponseRenderingContext, ResponseRenderingContext] { - import akka.stream.stage.Context + /** + * The `Expect: 100-continue` header has a special status in HTTP. + * It allows the client to send an `Expect: 100-continue` header with the request and then pause request sending + * (i.e. hold back sending the request entity). The server reads the request headers, determines whether it wants to + * accept the request and responds with + * + * - `417 Expectation Failed`, if it doesn't support the `100-continue` expectation + * (or if the `Expect` header contains other, unsupported expectations). + * - a `100 Continue` response, + * if it is ready to accept the request entity and the client should go ahead with sending it + * - a final response (like a 4xx to signal some client-side error + * (e.g. if the request entity length is beyond the configured limit) or a 3xx redirect) + * + * Only if the client receives a `100 Continue` response from the server is it allowed to continue sending the request + * entity. In this case it will receive another response after having completed request sending. + * So this special feature breaks the normal "one request - one response" logic of HTTP! + * It therefore requires special handling in all HTTP stacks (client- and server-side). + * + * For us this means: + * + * - on the server-side: + * After having read a `Expect: 100-continue` header with the request we package up an `HttpRequest` instance and send + * it through to the application. Only when (and if) the application then requests data from the entity stream do we + * send out a `100 Continue` response and continue reading the request entity. + * The application can therefore determine itself whether it wants the client to send the request entity + * by deciding whether to look at the request entity data stream or not. + * If the application sends a response *without* having looked at the request entity the client receives this + * response *instead of* the `100 Continue` response and the server closes the connection afterwards. + * + * - on the client-side: + * If the user adds a `Expect: 100-continue` header to the request we need to hold back sending the entity until + * we've received a `100 Continue` response. + */ + case object OneHundredContinue - private[this] var errorResponse: ResponseRenderingContext = _ + final class ErrorsTo500ResponseRecovery(log: LoggingAdapter) + extends PushPullStage[ResponseRenderingContext, ResponseRenderingContext] { - override def onPush(elem: ResponseRenderingContext, ctx: Context[ResponseRenderingContext]) = ctx.push(elem) + import akka.stream.stage.Context - override def onPull(ctx: Context[ResponseRenderingContext]) = - if (ctx.isFinishing) ctx.pushAndFinish(errorResponse) - else ctx.pull() + private[this] var errorResponse: ResponseRenderingContext = _ - override def onUpstreamFailure(error: Throwable, ctx: Context[ResponseRenderingContext]) = - error match { - case NonFatal(e) ⇒ - log.error(e, "Internal server error, sending 500 response") - errorResponse = ResponseRenderingContext(HttpResponse(StatusCodes.InternalServerError), - closeAfterResponseCompletion = true) - ctx.absorbTermination() - case _ ⇒ ctx.fail(error) - } -} + override def onPush(elem: ResponseRenderingContext, ctx: Context[ResponseRenderingContext]) = ctx.push(elem) + + override def onPull(ctx: Context[ResponseRenderingContext]) = + if (ctx.isFinishing) ctx.pushAndFinish(errorResponse) + else ctx.pull() + + override def onUpstreamFailure(error: Throwable, ctx: Context[ResponseRenderingContext]) = + error match { + case NonFatal(e) ⇒ + log.error(e, "Internal server error, sending 500 response") + errorResponse = ResponseRenderingContext(HttpResponse(StatusCodes.InternalServerError), + closeAfterResponseCompletion = true) + ctx.absorbTermination() + case _ ⇒ ctx.fail(error) + } + } +} \ No newline at end of file diff --git a/akka-http-core/src/main/scala/akka/http/engine/server/OneHundredContinue.scala b/akka-http-core/src/main/scala/akka/http/engine/server/OneHundredContinue.scala deleted file mode 100644 index 37c58cad4f..0000000000 --- a/akka-http-core/src/main/scala/akka/http/engine/server/OneHundredContinue.scala +++ /dev/null @@ -1,66 +0,0 @@ -/** - * Copyright (C) 2009-2014 Typesafe Inc. - */ - -package akka.http.engine.server - -import scala.annotation.tailrec -import akka.stream.actor.{ ActorPublisherMessage, ActorPublisher } - -/** - * The `Expect: 100-continue` header has a special status in HTTP. - * It allows the client to send an `Expect: 100-continue` header with the request and then pause request sending - * (i.e. hold back sending the request entity). The server reads the request headers, determines whether it wants to - * accept the request and responds with - * - * - `417 Expectation Failed`, if it doesn't support the `100-continue` expectation - * (or if the `Expect` header contains other, unsupported expectations). - * - a `100 Continue` response, - * if it is ready to accept the request entity and the client should go ahead with sending it - * - a final response (like a 4xx to signal some client-side error - * (e.g. if the request entity length is beyond the configured limit) or a 3xx redirect) - * - * Only if the client receives a `100 Continue` response from the server is it allowed to continue sending the request - * entity. In this case it will receive another response after having completed request sending. - * So this special feature breaks the normal "one request - one response" logic of HTTP! - * It therefore requires special handling in all HTTP stacks (client- and server-side). - * - * For us this means: - * - * - on the server-side: - * After having read a `Expect: 100-continue` header with the request we package up an `HttpRequest` instance and send - * it through to the application. Only when (and if) the application then requests data from the entity stream do we - * send out a `100 Continue` response and continue reading the request entity. - * The application can therefore determine itself whether it wants the client to send the request entity - * by deciding whether to look at the request entity data stream or not. - * If the application sends a response *without* having looked at the request entity the client receives this - * response *instead of* the `100 Continue` response and the server closes the connection afterwards. - * - * - on the client-side: - * If the user adds a `Expect: 100-continue` header to the request we need to hold back sending the entity until - * we've received a `100 Continue` response. - */ -private[engine] case object OneHundredContinue - -private[engine] class OneHundredContinueSourceActor extends ActorPublisher[OneHundredContinue.type] { - private var triggered = 0 - - def receive = { - case OneHundredContinue ⇒ - triggered += 1 - tryDispatch() - - case ActorPublisherMessage.Request(_) ⇒ - tryDispatch() - - case ActorPublisherMessage.Cancel ⇒ - context.stop(self) - } - - @tailrec private def tryDispatch(): Unit = - if (triggered > 0 && totalDemand > 0) { - onNext(OneHundredContinue) - triggered -= 1 - tryDispatch() - } -} \ No newline at end of file From 968e9cc5a7f4a62f5aeadebaf802897b1daf8f06 Mon Sep 17 00:00:00 2001 From: Mathias Date: Thu, 18 Dec 2014 17:05:50 +0100 Subject: [PATCH 06/10] =htc refactor HttpClient stream setup, closes #16510 --- .../akka/http/engine/client/HttpClient.scala | 224 ++++++++++-- .../engine/parsing/HttpMessageParser.scala | 64 ++-- .../engine/parsing/HttpRequestParser.scala | 7 +- .../engine/parsing/HttpResponseParser.scala | 42 ++- .../http/engine/parsing/ParserOutput.scala | 13 +- .../main/scala/akka/http/util/package.scala | 44 +-- .../scala/akka/http/ClientServerSpec.scala | 46 ++- .../http/engine/client/HttpClientSpec.scala | 330 ++++++++++++++++++ .../engine/parsing/RequestParserSpec.scala | 13 +- .../engine/parsing/ResponseParserSpec.scala | 14 +- .../http/engine/server/HttpServerSpec.scala | 4 +- .../testkit/RouteTestResultComponent.scala | 2 +- .../directives/RangeDirectivesSpec.scala | 11 +- 13 files changed, 687 insertions(+), 127 deletions(-) create mode 100644 akka-http-core/src/test/scala/akka/http/engine/client/HttpClientSpec.scala diff --git a/akka-http-core/src/main/scala/akka/http/engine/client/HttpClient.scala b/akka-http-core/src/main/scala/akka/http/engine/client/HttpClient.scala index 9020237811..89be13f9be 100644 --- a/akka-http-core/src/main/scala/akka/http/engine/client/HttpClient.scala +++ b/akka-http-core/src/main/scala/akka/http/engine/client/HttpClient.scala @@ -5,16 +5,17 @@ package akka.http.engine.client import java.net.InetSocketAddress -import scala.collection.immutable.Queue +import scala.annotation.tailrec +import scala.collection.mutable.ListBuffer +import akka.stream.stage._ import akka.util.ByteString import akka.event.LoggingAdapter import akka.stream.FlattenStrategy import akka.stream.scaladsl._ import akka.stream.scaladsl.OperationAttributes._ -import akka.http.model.{ HttpMethod, HttpRequest, HttpResponse } +import akka.http.model.{ IllegalResponseException, HttpMethod, HttpRequest, HttpResponse } import akka.http.engine.rendering.{ RequestRenderingContext, HttpRequestRendererFactory } -import akka.http.engine.parsing.{ HttpHeaderParser, HttpResponseParser } -import akka.http.engine.parsing.ParserOutput._ +import akka.http.engine.parsing.{ ParserOutput, HttpHeaderParser, HttpResponseParser } import akka.http.util._ /** @@ -37,39 +38,212 @@ private[http] object HttpClient { }) val requestRendererFactory = new HttpRequestRendererFactory(userAgentHeader, requestHeaderSizeHint, log) - val requestMethodByPass = new RequestMethodByPass(remoteAddress) - Flow[HttpRequest] - .map(requestMethodByPass) + /* + Basic Stream Setup + ================== + + requestIn +----------+ + +-----------------------------------------------+--->| Termi- | requestRendering + | | nation +---------------------> | + +-------------------------------------->| Merge | | + | Termination Backchannel | +----------+ | TCP- + | | | level + | | Method | client + | +------------+ | Bypass | flow + responseOut | responsePrep | Response |<---+ | + <------------+----------------| Parsing | | + | Merge |<------------------------------------------ V + +------------+ + */ + + val requestIn = UndefinedSource[HttpRequest] + val responseOut = UndefinedSink[HttpResponse] + + val methodBypassFanout = Broadcast[HttpRequest] + val responseParsingMerge = new ResponseParsingMerge(rootParser) + + val terminationFanout = Broadcast[HttpResponse] + val terminationMerge = new TerminationMerge + + val requestRendering = Flow[HttpRequest] + .map(RequestRenderingContext(_, remoteAddress)) .section(name("renderer"))(_.transform(() ⇒ requestRendererFactory.newRenderer)) .flatten(FlattenStrategy.concat) + + val transportFlow = Flow[ByteString] .section(name("errorLogger"))(_.transform(() ⇒ errorLogger(log, "Outgoing request stream error"))) .via(transport) - .section(name("rootParser"))(_.transform(() ⇒ - // each connection uses a single (private) response parser instance for all its responses - // which builds a cache of all header instances seen on that connection - rootParser.createShallowCopy(requestMethodByPass))) - .splitWhen(_.isInstanceOf[MessageStart]) + + val methodBypass = Flow[HttpRequest].map(_.method) + + import ParserOutput._ + val responsePrep = Flow[List[ResponseOutput]] + .transform(recover { case x: ResponseParsingError ⇒ x.error :: Nil }) // FIXME after #16565 + .mapConcat(identityFunc) + .splitWhen(x ⇒ x.isInstanceOf[MessageStart] || x == MessageEnd) .headAndTail .collect { case (ResponseStart(statusCode, protocol, headers, createEntity, _), entityParts) ⇒ HttpResponse(statusCode, headers, createEntity(entityParts), protocol) + case (MessageStartError(_, info), _) ⇒ throw IllegalResponseException(info) } + + import FlowGraphImplicits._ + + Flow() { implicit b ⇒ + requestIn ~> methodBypassFanout ~> terminationMerge.requestInput ~> requestRendering ~> transportFlow ~> + responseParsingMerge.dataInput ~> responsePrep ~> terminationFanout ~> responseOut + methodBypassFanout ~> methodBypass ~> responseParsingMerge.methodBypassInput + terminationFanout ~> terminationMerge.terminationBackchannelInput + + b.allowCycles() + + requestIn -> responseOut + } } - // FIXME: refactor to a pure-stream design that allows us to get rid of this ad-hoc queue here - class RequestMethodByPass(serverAddress: InetSocketAddress) - extends (HttpRequest ⇒ RequestRenderingContext) with (() ⇒ HttpMethod) { - private[this] var requestMethods = Queue.empty[HttpMethod] - def apply(request: HttpRequest) = { - requestMethods = requestMethods.enqueue(request.method) - RequestRenderingContext(request, serverAddress) + // a simple merge stage that simply forwards its first input and ignores its second input + // (the terminationBackchannelInput), but applies a special completion handling + class TerminationMerge extends FlexiMerge[HttpRequest] { + import FlexiMerge._ + val requestInput = createInputPort[HttpRequest]() + val terminationBackchannelInput = createInputPort[HttpResponse]() + + def createMergeLogic() = new MergeLogic[HttpRequest] { + override def inputHandles(inputCount: Int) = { + require(inputCount == 2, s"TerminationMerge must have 2 connected inputs, was $inputCount") + Vector(requestInput, terminationBackchannelInput) + } + + override def initialState = State[Any](ReadAny(requestInput, terminationBackchannelInput)) { + case (ctx, _, request: HttpRequest) ⇒ { ctx.emit(request); SameState } + case _ ⇒ SameState // simply drop all responses, we are only interested in the completion of the response input + } + + override def initialCompletionHandling = CompletionHandling( + onComplete = { + case (ctx, `requestInput`) ⇒ SameState + case (ctx, `terminationBackchannelInput`) ⇒ + ctx.complete() + SameState + }, + onError = defaultCompletionHandling.onError) } - def apply(): HttpMethod = - if (requestMethods.nonEmpty) { - val method = requestMethods.head - requestMethods = requestMethods.tail - method - } else HttpResponseParser.NoMethod + } + + import ParserOutput._ + + /** + * A FlexiMerge that follows this logic: + * 1. Wait on the methodBypass for the method of the request corresponding to the next response to be received + * 2. Read from the dataInput until exactly one response has been fully received + * 3. Go back to 1. + */ + class ResponseParsingMerge(rootParser: HttpResponseParser) extends FlexiMerge[List[ResponseOutput]] { + import FlexiMerge._ + val dataInput = createInputPort[ByteString]() + val methodBypassInput = createInputPort[HttpMethod]() + + def createMergeLogic() = new MergeLogic[List[ResponseOutput]] { + // each connection uses a single (private) response parser instance for all its responses + // which builds a cache of all header instances seen on that connection + val parser = rootParser.createShallowCopy() + var methodBypassCompleted = false + + override def inputHandles(inputCount: Int) = { + require(inputCount == 2, s"ResponseParsingMerge must have 2 connected inputs, was $inputCount") + Vector(dataInput, methodBypassInput) + } + + override val initialState: State[HttpMethod] = + State(Read(methodBypassInput)) { + case (ctx, _, method) ⇒ + parser.setRequestMethodForNextResponse(method) + drainParser(parser.onPush(ByteString.empty), ctx, + onNeedNextMethod = () ⇒ SameState, + onNeedMoreData = () ⇒ { + ctx.changeCompletionHandling(responseReadingCompletionHandling) + responseReadingState + }) + } + + val responseReadingState: State[ByteString] = + State(Read(dataInput)) { + case (ctx, _, bytes) ⇒ + drainParser(parser.onPush(bytes), ctx, + onNeedNextMethod = () ⇒ { + if (methodBypassCompleted) { + ctx.complete() + SameState + } else { + ctx.changeCompletionHandling(initialCompletionHandling) + initialState + } + }, + onNeedMoreData = () ⇒ SameState) + } + + @tailrec def drainParser(current: ResponseOutput, ctx: MergeLogicContext, + onNeedNextMethod: () ⇒ State[_], onNeedMoreData: () ⇒ State[_], + b: ListBuffer[ResponseOutput] = ListBuffer.empty): State[_] = { + def emit(output: List[ResponseOutput]): Unit = if (output.nonEmpty) ctx.emit(output) + current match { + case NeedNextRequestMethod ⇒ + emit(b.result()) + onNeedNextMethod() + case StreamEnd ⇒ + emit(b.result()) + ctx.complete() + SameState + case NeedMoreData ⇒ + emit(b.result()) + onNeedMoreData() + case x ⇒ drainParser(parser.onPull(), ctx, onNeedNextMethod, onNeedMoreData, b += x) + } + } + + override val initialCompletionHandling = CompletionHandling( + onComplete = (ctx, _) ⇒ { ctx.complete(); SameState }, + onError = defaultCompletionHandling.onError) + + val responseReadingCompletionHandling = CompletionHandling( + onComplete = { + case (ctx, `methodBypassInput`) ⇒ + methodBypassCompleted = true + SameState + case (ctx, `dataInput`) ⇒ + if (parser.onUpstreamFinish()) { + ctx.complete() + } else { + // not pretty but because the FlexiMerge doesn't let us emit from here (#16565) + // we need to funnel the error through the error channel + ctx.error(new ResponseParsingError(parser.onPull().asInstanceOf[ErrorOutput])) + } + SameState + }, + onError = defaultCompletionHandling.onError) + } + } + + private class ResponseParsingError(val error: ErrorOutput) extends RuntimeException + + // TODO: remove after #16394 is cleared + def recover[A, B >: A](pf: PartialFunction[Throwable, B]): () ⇒ PushPullStage[A, B] = { + val stage = new PushPullStage[A, B] { + var recovery: Option[B] = None + def onPush(elem: A, ctx: Context[B]): Directive = ctx.push(elem) + def onPull(ctx: Context[B]): Directive = recovery match { + case None ⇒ ctx.pull() + case Some(x) ⇒ { recovery = null; ctx.push(x) } + case null ⇒ ctx.finish() + } + override def onUpstreamFailure(cause: Throwable, ctx: Context[B]): TerminationDirective = + if (pf isDefinedAt cause) { + recovery = Some(pf(cause)) + ctx.absorbTermination() + } else super.onUpstreamFailure(cause, ctx) + } + () ⇒ stage } } \ No newline at end of file diff --git a/akka-http-core/src/main/scala/akka/http/engine/parsing/HttpMessageParser.scala b/akka-http-core/src/main/scala/akka/http/engine/parsing/HttpMessageParser.scala index ef3367ee8a..c2288ccdf0 100644 --- a/akka-http-core/src/main/scala/akka/http/engine/parsing/HttpMessageParser.scala +++ b/akka-http-core/src/main/scala/akka/http/engine/parsing/HttpMessageParser.scala @@ -10,7 +10,6 @@ import akka.parboiled2.CharUtils import akka.util.ByteString import akka.stream.scaladsl.Source import akka.stream.stage._ -import akka.http.Http.StreamException import akka.http.model.parser.CharacterClasses import akka.http.util._ import akka.http.model._ @@ -22,21 +21,33 @@ import ParserOutput._ * INTERNAL API */ private[http] abstract class HttpMessageParser[Output >: MessageOutput <: ParserOutput](val settings: ParserSettings, - val headerParser: HttpHeaderParser) - extends PushPullStage[ByteString, Output] { + val headerParser: HttpHeaderParser) { self ⇒ import HttpMessageParser._ import settings._ - sealed trait StateResult // phantom type for ensuring soundness of our parsing method setup - final case class Trampoline(f: ByteString ⇒ StateResult) extends StateResult - private[this] val result = new ListBuffer[Output] private[this] var state: ByteString ⇒ StateResult = startNewMessage(_, 0) private[this] var protocol: HttpProtocol = `HTTP/1.1` private[this] var completionHandling: CompletionHandling = CompletionOk private[this] var terminated = false - override def onPush(input: ByteString, ctx: Context[Output]): Directive = { + def isTerminated = terminated + + val stage: PushPullStage[ByteString, Output] = + new PushPullStage[ByteString, Output] { + def onPush(elem: ByteString, ctx: Context[Output]) = handleParserOutput(self.onPush(elem), ctx) + def onPull(ctx: Context[Output]) = handleParserOutput(self.onPull(), ctx) + private def handleParserOutput(output: Output, ctx: Context[Output]): Directive = + output match { + case StreamEnd ⇒ ctx.finish() + case NeedMoreData ⇒ ctx.pull() + case x ⇒ ctx.push(x) + } + override def onUpstreamFinish(ctx: Context[Output]): TerminationDirective = + if (self.onUpstreamFinish()) ctx.finish() else ctx.absorbTermination() + } + + final def onPush(input: ByteString): Output = { @tailrec def run(next: ByteString ⇒ StateResult): StateResult = (try next(input) catch { @@ -51,37 +62,32 @@ private[http] abstract class HttpMessageParser[Output >: MessageOutput <: Parser if (result.nonEmpty) throw new IllegalStateException("Unexpected `onPush`") run(state) - pushResultHeadAndFinishOrPull(ctx) + onPull() } - def onPull(ctx: Context[Output]): Directive = pushResultHeadAndFinishOrPull(ctx) - - def pushResultHeadAndFinishOrPull(ctx: Context[Output]): Directive = + final def onPull(): Output = if (result.nonEmpty) { val head = result.head result.remove(0) // faster than `ListBuffer::drop` - ctx.push(head) - } else if (terminated) ctx.finish() else ctx.pull() + head + } else if (terminated) StreamEnd else NeedMoreData - override def onUpstreamFinish(ctx: Context[Output]) = { + final def onUpstreamFinish(): Boolean = { completionHandling() match { - case Some(x) ⇒ emit(x.asInstanceOf[Output]) + case Some(x) ⇒ emit(x) case None ⇒ // nothing to do } terminated = true - if (result.isEmpty) ctx.finish() else ctx.absorbTermination() + result.isEmpty } - def startNewMessage(input: ByteString, offset: Int): StateResult = { - def _startNewMessage(input: ByteString, offset: Int): StateResult = - try parseMessage(input, offset) - catch { case NotEnoughDataException ⇒ continue(input, offset)(_startNewMessage) } - + protected final def startNewMessage(input: ByteString, offset: Int): StateResult = { if (offset < input.length) setCompletionHandling(CompletionIsMessageStartError) - _startNewMessage(input, offset) + try parseMessage(input, offset) + catch { case NotEnoughDataException ⇒ continue(input, offset)(startNewMessage) } } - def parseMessage(input: ByteString, offset: Int): StateResult + protected def parseMessage(input: ByteString, offset: Int): StateResult def parseProtocol(input: ByteString, cursor: Int): Int = { def c(ix: Int) = byteChar(input, cursor + ix) @@ -204,7 +210,7 @@ private[http] abstract class HttpMessageParser[Output >: MessageOutput <: Parser val chunkBodyEnd = cursor + chunkSize def result(terminatorLen: Int) = { emit(EntityChunk(HttpEntity.Chunk(input.slice(cursor, chunkBodyEnd), extension))) - trampoline(_ ⇒ parseChunk(input, chunkBodyEnd + terminatorLen, isLastMessage)) + Trampoline(_ ⇒ parseChunk(input, chunkBodyEnd + terminatorLen, isLastMessage)) } byteChar(input, chunkBodyEnd) match { case '\r' if byteChar(input, chunkBodyEnd + 1) == '\n' ⇒ result(2) @@ -255,7 +261,6 @@ private[http] abstract class HttpMessageParser[Output >: MessageOutput <: Parser state = next(_, 0) done() } - def trampoline(next: ByteString ⇒ StateResult): StateResult = Trampoline(next) def failMessageStart(summary: String): StateResult = failMessageStart(summary, "") def failMessageStart(summary: String, detail: String): StateResult = failMessageStart(StatusCodes.BadRequest, summary, detail) @@ -299,7 +304,7 @@ private[http] abstract class HttpMessageParser[Output >: MessageOutput <: Parser transformData: Source[ByteString] ⇒ Source[ByteString] = identityFunc)(entityParts: Source[_ <: ParserOutput]): UniversalEntity = { val data = entityParts.collect { case EntityPart(bytes) ⇒ bytes - case EntityStreamError(info) ⇒ throw new StreamException(info) + case EntityStreamError(info) ⇒ throw EntityStreamException(info) } HttpEntity.Default(contentType(cth), contentLength, transformData(data)) } @@ -308,7 +313,7 @@ private[http] abstract class HttpMessageParser[Output >: MessageOutput <: Parser transformChunks: Source[HttpEntity.ChunkStreamPart] ⇒ Source[HttpEntity.ChunkStreamPart] = identityFunc)(entityChunks: Source[_ <: ParserOutput]): RequestEntity = { val chunks = entityChunks.collect { case EntityChunk(chunk) ⇒ chunk - case EntityStreamError(info) ⇒ throw new StreamException(info) + case EntityStreamError(info) ⇒ throw EntityStreamException(info) } HttpEntity.Chunked(contentType(cth), transformChunks(chunks)) } @@ -324,7 +329,10 @@ private[http] abstract class HttpMessageParser[Output >: MessageOutput <: Parser } private[http] object HttpMessageParser { - type CompletionHandling = () ⇒ Option[ParserOutput] + sealed trait StateResult // phantom type for ensuring soundness of our parsing method setup + final case class Trampoline(f: ByteString ⇒ StateResult) extends StateResult + + type CompletionHandling = () ⇒ Option[ErrorOutput] val CompletionOk: CompletionHandling = () ⇒ None val CompletionIsMessageStartError: CompletionHandling = () ⇒ Some(ParserOutput.MessageStartError(StatusCodes.BadRequest, ErrorInfo("Illegal HTTP message start"))) diff --git a/akka-http-core/src/main/scala/akka/http/engine/parsing/HttpRequestParser.scala b/akka-http-core/src/main/scala/akka/http/engine/parsing/HttpRequestParser.scala index b5059c5355..792b18675c 100644 --- a/akka-http-core/src/main/scala/akka/http/engine/parsing/HttpRequestParser.scala +++ b/akka-http-core/src/main/scala/akka/http/engine/parsing/HttpRequestParser.scala @@ -11,9 +11,9 @@ import akka.stream.scaladsl.OperationAttributes._ import akka.stream.stage.{ Context, PushPullStage } import akka.stream.scaladsl.Source import akka.util.ByteString -import akka.http.engine.server.OneHundredContinue import akka.http.model.parser.CharacterClasses import akka.http.util.identityFunc +import akka.http.engine.TokenSourceActor import akka.http.model._ import headers._ import StatusCodes._ @@ -27,6 +27,7 @@ private[http] class HttpRequestParser(_settings: ParserSettings, _headerParser: HttpHeaderParser, oneHundredContinueRef: () ⇒ Option[ActorRef] = () ⇒ None) extends HttpMessageParser[RequestOutput](_settings, _headerParser) { + import HttpMessageParser._ import settings._ private[this] var method: HttpMethod = _ @@ -105,7 +106,7 @@ private[http] class HttpRequestParser(_settings: ParserSettings, uriBytes = input.iterator.slice(uriStart, uriEnd).toArray[Byte] // TODO: can we reduce allocations here? uri = Uri.parseHttpRequestTarget(uriBytes, mode = uriParsingMode) } catch { - case e: IllegalUriException ⇒ throw new ParsingException(BadRequest, e.info) + case IllegalUriException(info) ⇒ throw new ParsingException(BadRequest, info) } uriEnd + 1 } @@ -133,7 +134,7 @@ private[http] class HttpRequestParser(_settings: ParserSettings, def onPull(ctx: Context[T]) = { if (!oneHundredContinueSent) { val ref = oneHundredContinueRef().getOrElse(throw new IllegalStateException("oneHundredContinueRef unavailable")) - ref ! OneHundredContinue + ref ! TokenSourceActor.Trigger oneHundredContinueSent = true } ctx.pull() diff --git a/akka-http-core/src/main/scala/akka/http/engine/parsing/HttpResponseParser.scala b/akka-http-core/src/main/scala/akka/http/engine/parsing/HttpResponseParser.scala index 86629da8b2..2e9eabfde8 100644 --- a/akka-http-core/src/main/scala/akka/http/engine/parsing/HttpResponseParser.scala +++ b/akka-http-core/src/main/scala/akka/http/engine/parsing/HttpResponseParser.scala @@ -10,38 +10,41 @@ import akka.stream.scaladsl.Source import akka.util.ByteString import akka.http.model._ import headers._ -import HttpResponseParser.NoMethod import ParserOutput._ /** * INTERNAL API */ -private[http] class HttpResponseParser(_settings: ParserSettings, - _headerParser: HttpHeaderParser, - dequeueRequestMethodForNextResponse: () ⇒ HttpMethod = () ⇒ NoMethod) +private[http] class HttpResponseParser(_settings: ParserSettings, _headerParser: HttpHeaderParser) extends HttpMessageParser[ResponseOutput](_settings, _headerParser) { + import HttpMessageParser._ import settings._ - private[this] var requestMethodForCurrentResponse: HttpMethod = NoMethod + private[this] var requestMethodForCurrentResponse: Option[HttpMethod] = None private[this] var statusCode: StatusCode = StatusCodes.OK - def createShallowCopy(dequeueRequestMethodForNextResponse: () ⇒ HttpMethod): HttpResponseParser = - new HttpResponseParser(settings, headerParser.createShallowCopy(), dequeueRequestMethodForNextResponse) + def createShallowCopy(): HttpResponseParser = new HttpResponseParser(settings, headerParser.createShallowCopy()) - override def startNewMessage(input: ByteString, offset: Int): StateResult = { - requestMethodForCurrentResponse = dequeueRequestMethodForNextResponse() - super.startNewMessage(input, offset) - } + def setRequestMethodForNextResponse(requestMethod: HttpMethod): Unit = + if (requestMethodForCurrentResponse.isEmpty) requestMethodForCurrentResponse = Some(requestMethod) - def parseMessage(input: ByteString, offset: Int): StateResult = - if (requestMethodForCurrentResponse ne NoMethod) { + protected def parseMessage(input: ByteString, offset: Int): StateResult = + if (requestMethodForCurrentResponse.isDefined) { var cursor = parseProtocol(input, offset) if (byteChar(input, cursor) == ' ') { cursor = parseStatusCode(input, cursor + 1) cursor = parseReason(input, cursor)() parseHeaderLines(input, cursor) } else badProtocol - } else failMessageStart("Unexpected server response", input.drop(offset).utf8String) + } else { + emit(NeedNextRequestMethod) + done() + } + + override def emit(output: ResponseOutput): Unit = { + if (output == MessageEnd) requestMethodForCurrentResponse = None + super.emit(output) + } def badProtocol = throw new ParsingException("The server-side HTTP version is not supported") @@ -81,10 +84,11 @@ private[http] class HttpResponseParser(_settings: ParserSettings, def finishEmptyResponse() = { emitResponseStart(emptyEntity(cth)) setCompletionHandling(HttpMessageParser.CompletionOk) + emit(MessageEnd) startNewMessage(input, bodyStart) } - if (statusCode.allowsEntity && (requestMethodForCurrentResponse ne HttpMethods.HEAD)) { + if (statusCode.allowsEntity && (requestMethodForCurrentResponse.get != HttpMethods.HEAD)) { teh match { case None ⇒ clh match { case Some(`Content-Length`(contentLength)) ⇒ @@ -95,6 +99,7 @@ private[http] class HttpResponseParser(_settings: ParserSettings, val cl = contentLength.toInt emitResponseStart(strictEntity(cth, input, bodyStart, cl)) setCompletionHandling(HttpMessageParser.CompletionOk) + emit(MessageEnd) startNewMessage(input, bodyStart + cl) } else { emitResponseStart(defaultEntity(cth, contentLength)) @@ -128,11 +133,4 @@ private[http] class HttpResponseParser(_settings: ParserSettings, emit(EntityPart(input drop bodyStart)) continue(parseToCloseBody) } -} - -/** - * INTERNAL API - */ -private[http] object HttpResponseParser { - val NoMethod = HttpMethod.custom("NONE", safe = false, idempotent = false, entityAccepted = false) } \ No newline at end of file diff --git a/akka-http-core/src/main/scala/akka/http/engine/parsing/ParserOutput.scala b/akka-http-core/src/main/scala/akka/http/engine/parsing/ParserOutput.scala index 07daa875f9..ea264f715d 100644 --- a/akka-http-core/src/main/scala/akka/http/engine/parsing/ParserOutput.scala +++ b/akka-http-core/src/main/scala/akka/http/engine/parsing/ParserOutput.scala @@ -21,6 +21,7 @@ private[http] object ParserOutput { sealed trait ResponseOutput extends ParserOutput sealed trait MessageStart extends ParserOutput sealed trait MessageOutput extends RequestOutput with ResponseOutput + sealed trait ErrorOutput extends MessageOutput final case class RequestStart( method: HttpMethod, @@ -44,7 +45,15 @@ private[http] object ParserOutput { final case class EntityChunk(chunk: HttpEntity.ChunkStreamPart) extends MessageOutput - final case class MessageStartError(status: StatusCode, info: ErrorInfo) extends MessageStart with MessageOutput + final case class MessageStartError(status: StatusCode, info: ErrorInfo) extends MessageStart with ErrorOutput - final case class EntityStreamError(info: ErrorInfo) extends MessageOutput + final case class EntityStreamError(info: ErrorInfo) extends ErrorOutput + + //////////// meta messages /////////// + + case object StreamEnd extends MessageOutput + + case object NeedMoreData extends MessageOutput + + case object NeedNextRequestMethod extends ResponseOutput } diff --git a/akka-http-core/src/main/scala/akka/http/util/package.scala b/akka-http-core/src/main/scala/akka/http/util/package.scala index d7808d8ac3..996d3e7347 100644 --- a/akka-http-core/src/main/scala/akka/http/util/package.scala +++ b/akka-http-core/src/main/scala/akka/http/util/package.scala @@ -55,31 +55,25 @@ package object util { .flatten(FlattenStrategy.concat) } - private[http] implicit class EnhancedSource[T](val underlying: Source[T]) { - def printEvent(marker: String): Source[T] = - underlying.transform(() ⇒ new PushStage[T, T] { - override def onPush(element: T, ctx: Context[T]): Directive = { - println(s"$marker: $element") - ctx.push(element) - } - override def onUpstreamFailure(cause: Throwable, ctx: Context[T]): TerminationDirective = { - println(s"$marker: Failure $cause") - super.onUpstreamFailure(cause, ctx) - } - override def onUpstreamFinish(ctx: Context[T]): TerminationDirective = { - println(s"$marker: Terminated") - super.onUpstreamFinish(ctx) - } - }) - - /** - * Drain this stream into a Vector and provide it as a future value. - * - * FIXME: Should be part of akka-streams - */ - def collectAll(implicit materializer: FlowMaterializer): Future[immutable.Seq[T]] = - underlying.fold(Vector.empty[T])(_ :+ _) - } + def printEvent[T](marker: String): Flow[T, T] = + Flow[T].transform(() ⇒ new PushStage[T, T] { + override def onPush(element: T, ctx: Context[T]): Directive = { + println(s"$marker: $element") + ctx.push(element) + } + override def onUpstreamFailure(cause: Throwable, ctx: Context[T]): TerminationDirective = { + println(s"$marker: Error $cause") + super.onUpstreamFailure(cause, ctx) + } + override def onUpstreamFinish(ctx: Context[T]): TerminationDirective = { + println(s"$marker: Complete") + super.onUpstreamFinish(ctx) + } + override def onDownstreamFinish(ctx: Context[T]): TerminationDirective = { + println(s"$marker: Cancel") + super.onDownstreamFinish(ctx) + } + }) private[http] implicit class AddFutureAwaitResult[T](future: Future[T]) { /** "Safe" Await.result that doesn't throw away half of the stacktrace */ diff --git a/akka-http-core/src/test/scala/akka/http/ClientServerSpec.scala b/akka-http-core/src/test/scala/akka/http/ClientServerSpec.scala index 6b4fceea45..602b849209 100644 --- a/akka-http-core/src/test/scala/akka/http/ClientServerSpec.scala +++ b/akka-http-core/src/test/scala/akka/http/ClientServerSpec.scala @@ -36,7 +36,7 @@ class ClientServerSpec extends WordSpec with Matchers with BeforeAndAfterAll { implicit val materializer = FlowMaterializer() - "The server-side HTTP infrastructure" should { + "The low-level HTTP infrastructure" should { "properly bind a server" in { val (hostname, port) = temporaryServerHostnameAndPort() @@ -70,6 +70,7 @@ class ClientServerSpec extends WordSpec with Matchers with BeforeAndAfterAll { val (serverIn, serverOut) = acceptConnection() val clientOutSub = clientOut.expectSubscription() + clientOutSub.expectRequest() clientOutSub.sendNext(HttpRequest(uri = "/abc")) val serverInSub = serverIn.expectSubscription() @@ -77,12 +78,20 @@ class ClientServerSpec extends WordSpec with Matchers with BeforeAndAfterAll { serverIn.expectNext().uri shouldEqual Uri(s"http://$hostname:$port/abc") val serverOutSub = serverOut.expectSubscription() + serverOutSub.expectRequest() serverOutSub.sendNext(HttpResponse(entity = "yeah")) val clientInSub = clientIn.expectSubscription() clientInSub.request(1) val response = clientIn.expectNext() toStrict(response.entity) shouldEqual HttpEntity("yeah") + + clientOutSub.sendComplete() + serverInSub.request(1) // work-around for #16552 + serverIn.expectComplete() + serverOutSub.expectCancellation() + clientInSub.request(1) // work-around for #16552 + clientIn.expectComplete() } "properly complete a chunked request/response cycle" in new TestSetup { @@ -104,6 +113,7 @@ class ClientServerSpec extends WordSpec with Matchers with BeforeAndAfterAll { Await.result(chunkStream.grouped(4).runWith(Sink.head), 100.millis) shouldEqual chunks val serverOutSub = serverOut.expectSubscription() + serverOutSub.expectRequest() serverOutSub.sendNext(HttpResponse(206, List(RawHeader("Age", "42")), chunkedEntity)) val clientInSub = clientIn.expectSubscription() @@ -111,8 +121,42 @@ class ClientServerSpec extends WordSpec with Matchers with BeforeAndAfterAll { val HttpResponse(StatusCodes.PartialContent, List(RawHeader("Age", "42"), Server(_), Date(_)), Chunked(`chunkedContentType`, chunkStream2), HttpProtocols.`HTTP/1.1`) = clientIn.expectNext() Await.result(chunkStream2.grouped(1000).runWith(Sink.head), 100.millis) shouldEqual chunks + + clientOutSub.sendComplete() + serverInSub.request(1) // work-around for #16552 + serverIn.expectComplete() + serverOutSub.expectCancellation() + clientInSub.request(1) // work-around for #16552 + clientIn.expectComplete() } + "be able to deal with eager closing of the request stream on the client side" in new TestSetup { + val (clientOut, clientIn) = openNewClientConnection() + val (serverIn, serverOut) = acceptConnection() + + val clientOutSub = clientOut.expectSubscription() + clientOutSub.sendNext(HttpRequest(uri = "/abc")) + clientOutSub.sendComplete() // complete early + + val serverInSub = serverIn.expectSubscription() + serverInSub.request(1) + serverIn.expectNext().uri shouldEqual Uri(s"http://$hostname:$port/abc") + + val serverOutSub = serverOut.expectSubscription() + serverOutSub.expectRequest() + serverOutSub.sendNext(HttpResponse(entity = "yeah")) + + val clientInSub = clientIn.expectSubscription() + clientInSub.request(1) + val response = clientIn.expectNext() + toStrict(response.entity) shouldEqual HttpEntity("yeah") + + serverInSub.request(1) // work-around for #16552 + serverIn.expectComplete() + serverOutSub.expectCancellation() + clientInSub.request(1) // work-around for #16552 + clientIn.expectComplete() + } } override def afterAll() = system.shutdown() diff --git a/akka-http-core/src/test/scala/akka/http/engine/client/HttpClientSpec.scala b/akka-http-core/src/test/scala/akka/http/engine/client/HttpClientSpec.scala new file mode 100644 index 0000000000..510b3b9490 --- /dev/null +++ b/akka-http-core/src/test/scala/akka/http/engine/client/HttpClientSpec.scala @@ -0,0 +1,330 @@ +/* + * Copyright (C) 2009-2014 Typesafe Inc. + */ + +package akka.http.engine.client + +import java.net.InetSocketAddress +import org.scalatest.Inside +import akka.util.ByteString +import akka.event.NoLogging +import akka.stream.FlowMaterializer +import akka.stream.testkit.{ AkkaSpec, StreamTestKit } +import akka.stream.scaladsl._ +import akka.http.model.HttpEntity._ +import akka.http.model.HttpMethods._ +import akka.http.model._ +import akka.http.model.headers._ +import akka.http.util._ + +class HttpClientSpec extends AkkaSpec("akka.loggers = []\n akka.loglevel = OFF") with Inside { + implicit val materializer = FlowMaterializer() + + "The client implementation" should { + + "properly handle a request/response round-trip" which { + + "has a request with empty entity" in new TestSetup { + requestsSub.sendNext(HttpRequest()) + expectWireData( + """GET / HTTP/1.1 + |Host: example.com:80 + |User-Agent: akka-http/test + | + |""") + + netInSub.expectRequest(16) + sendWireData( + """HTTP/1.1 200 OK + |Content-Length: 0 + | + |""") + + responsesSub.request(1) + responses.expectNext(HttpResponse()) + + requestsSub.sendComplete() + netOut.expectComplete() + netInSub.sendComplete() + responses.expectComplete() + } + + "has a request with default entity" in new TestSetup { + val probe = StreamTestKit.PublisherProbe[ByteString]() + requestsSub.sendNext(HttpRequest(PUT, entity = HttpEntity(ContentTypes.`application/octet-stream`, 8, Source(probe)))) + expectWireData( + """PUT / HTTP/1.1 + |Host: example.com:80 + |User-Agent: akka-http/test + |Content-Type: application/octet-stream + |Content-Length: 8 + | + |""") + val sub = probe.expectSubscription() + sub.expectRequest(4) + sub.sendNext(ByteString("ABC")) + expectWireData("ABC") + sub.sendNext(ByteString("DEF")) + expectWireData("DEF") + sub.sendNext(ByteString("XY")) + expectWireData("XY") + sub.sendComplete() + + netInSub.expectRequest(16) + sendWireData( + """HTTP/1.1 200 OK + |Content-Length: 0 + | + |""") + + responsesSub.request(1) + responses.expectNext(HttpResponse()) + + requestsSub.sendComplete() + netOut.expectComplete() + netInSub.sendComplete() + responses.expectComplete() + } + + "has a response with a default entity" in new TestSetup { + requestsSub.sendNext(HttpRequest()) + expectWireData( + """GET / HTTP/1.1 + |Host: example.com:80 + |User-Agent: akka-http/test + | + |""") + + netInSub.expectRequest(16) + sendWireData( + """HTTP/1.1 200 OK + |Transfer-Encoding: chunked + | + |""") + + responsesSub.request(1) + val HttpResponse(_, _, HttpEntity.Chunked(ct, chunks), _) = responses.expectNext() + ct shouldEqual ContentTypes.`application/octet-stream` + + val probe = StreamTestKit.SubscriberProbe[ChunkStreamPart]() + chunks.runWith(Sink(probe)) + val sub = probe.expectSubscription() + + sendWireData("3\nABC\n") + sub.request(1) + probe.expectNext(HttpEntity.Chunk("ABC")) + + sendWireData("4\nDEFX\n") + sub.request(1) + probe.expectNext(HttpEntity.Chunk("DEFX")) + + sendWireData("0\n\n") + sub.request(1) + probe.expectNext(HttpEntity.LastChunk) + probe.expectComplete() + + requestsSub.sendComplete() + netOut.expectComplete() + netInSub.sendComplete() + responses.expectComplete() + } + + "exhibits eager request stream completion" in new TestSetup { + requestsSub.sendNext(HttpRequest()) + requestsSub.sendComplete() + expectWireData( + """GET / HTTP/1.1 + |Host: example.com:80 + |User-Agent: akka-http/test + | + |""") + + netInSub.expectRequest(16) + sendWireData( + """HTTP/1.1 200 OK + |Content-Length: 0 + | + |""") + + responsesSub.request(1) + responses.expectNext(HttpResponse()) + + netOut.expectComplete() + netInSub.sendComplete() + responses.expectComplete() + } + } + + "produce proper errors" which { + + "catch the entity stream being shorter than the Content-Length" in new TestSetup { + val probe = StreamTestKit.PublisherProbe[ByteString]() + requestsSub.sendNext(HttpRequest(PUT, entity = HttpEntity(ContentTypes.`application/octet-stream`, 8, Source(probe)))) + expectWireData( + """PUT / HTTP/1.1 + |Host: example.com:80 + |User-Agent: akka-http/test + |Content-Type: application/octet-stream + |Content-Length: 8 + | + |""") + val sub = probe.expectSubscription() + sub.expectRequest(4) + sub.sendNext(ByteString("ABC")) + expectWireData("ABC") + sub.sendNext(ByteString("DEF")) + expectWireData("DEF") + sub.sendComplete() + + val InvalidContentLengthException(info) = netOut.expectError() + info.summary shouldEqual "HTTP message had declared Content-Length 8 but entity data stream amounts to 2 bytes less" + netInSub.sendComplete() + responses.expectComplete() + netInSub.expectCancellation() + } + + "catch the entity stream being longer than the Content-Length" in new TestSetup { + val probe = StreamTestKit.PublisherProbe[ByteString]() + requestsSub.sendNext(HttpRequest(PUT, entity = HttpEntity(ContentTypes.`application/octet-stream`, 8, Source(probe)))) + expectWireData( + """PUT / HTTP/1.1 + |Host: example.com:80 + |User-Agent: akka-http/test + |Content-Type: application/octet-stream + |Content-Length: 8 + | + |""") + val sub = probe.expectSubscription() + sub.expectRequest(4) + sub.sendNext(ByteString("ABC")) + expectWireData("ABC") + sub.sendNext(ByteString("DEF")) + expectWireData("DEF") + sub.sendNext(ByteString("XYZ")) + + val InvalidContentLengthException(info) = netOut.expectError() + info.summary shouldEqual "HTTP message had declared Content-Length 8 but entity data stream amounts to more bytes" + netInSub.sendComplete() + responses.expectComplete() + netInSub.expectCancellation() + } + + "catch illegal response starts" in new TestSetup { + requestsSub.sendNext(HttpRequest()) + expectWireData( + """GET / HTTP/1.1 + |Host: example.com:80 + |User-Agent: akka-http/test + | + |""") + + netInSub.expectRequest(16) + sendWireData( + """HTTP/1.2 200 OK + | + |""") + + val error @ IllegalResponseException(info) = responses.expectError() + info.summary shouldEqual "The server-side HTTP version is not supported" + netOut.expectError(error) + requestsSub.expectCancellation() + } + + "catch illegal response chunks" in new TestSetup { + requestsSub.sendNext(HttpRequest()) + expectWireData( + """GET / HTTP/1.1 + |Host: example.com:80 + |User-Agent: akka-http/test + | + |""") + + netInSub.expectRequest(16) + sendWireData( + """HTTP/1.1 200 OK + |Transfer-Encoding: chunked + | + |""") + + responsesSub.request(1) + val HttpResponse(_, _, HttpEntity.Chunked(ct, chunks), _) = responses.expectNext() + ct shouldEqual ContentTypes.`application/octet-stream` + + val probe = StreamTestKit.SubscriberProbe[ChunkStreamPart]() + chunks.runWith(Sink(probe)) + val sub = probe.expectSubscription() + + sendWireData("3\nABC\n") + sub.request(1) + probe.expectNext(HttpEntity.Chunk("ABC")) + + sendWireData("4\nDEFXX") + sub.request(1) + val error @ EntityStreamException(info) = probe.expectError() + info.summary shouldEqual "Illegal chunk termination" + + responses.expectComplete() + netOut.expectComplete() + requestsSub.expectCancellation() + } + + "catch a response start truncation" in new TestSetup { + requestsSub.sendNext(HttpRequest()) + expectWireData( + """GET / HTTP/1.1 + |Host: example.com:80 + |User-Agent: akka-http/test + | + |""") + + netInSub.expectRequest(16) + sendWireData("HTTP/1.1 200 OK") + netInSub.sendComplete() + + val error @ IllegalResponseException(info) = responses.expectError() + info.summary shouldEqual "Illegal HTTP message start" + netOut.expectError(error) + requestsSub.expectCancellation() + } + } + } + + class TestSetup { + val requests = StreamTestKit.PublisherProbe[HttpRequest] + val responses = StreamTestKit.SubscriberProbe[HttpResponse] + val remoteAddress = new InetSocketAddress("example.com", 80) + + def settings = ClientConnectionSettings(system) + .copy(userAgentHeader = Some(`User-Agent`(List(ProductVersion("akka-http", "test"))))) + + val (netOut, netIn) = { + val netOut = StreamTestKit.SubscriberProbe[ByteString] + val netIn = StreamTestKit.PublisherProbe[ByteString] + val clientFlow = HttpClient.transportToConnectionClientFlow( + Flow(Sink(netOut), Source(netIn)), remoteAddress, settings, NoLogging) + Source(requests).via(clientFlow).runWith(Sink(responses)) + netOut -> netIn + } + + def wipeDate(string: String) = + string.fastSplit('\n').map { + case s if s.startsWith("Date:") ⇒ "Date: XXXX\r" + case s ⇒ s + }.mkString("\n") + + val netInSub = netIn.expectSubscription() + val netOutSub = netOut.expectSubscription() + val requestsSub = requests.expectSubscription() + val responsesSub = responses.expectSubscription() + + def sendWireData(data: String): Unit = sendWireData(ByteString(data.stripMarginWithNewline("\r\n"), "ASCII")) + def sendWireData(data: ByteString): Unit = netInSub.sendNext(data) + + def expectWireData(s: String) = { + netOutSub.request(1) + netOut.expectNext().utf8String shouldEqual s.stripMarginWithNewline("\r\n") + } + + def closeNetworkInput(): Unit = netInSub.sendComplete() + } +} \ No newline at end of file diff --git a/akka-http-core/src/test/scala/akka/http/engine/parsing/RequestParserSpec.scala b/akka-http-core/src/test/scala/akka/http/engine/parsing/RequestParserSpec.scala index 377db41b8d..ad92cd0ea1 100644 --- a/akka-http-core/src/test/scala/akka/http/engine/parsing/RequestParserSpec.scala +++ b/akka-http-core/src/test/scala/akka/http/engine/parsing/RequestParserSpec.scala @@ -120,8 +120,9 @@ class RequestParserSpec extends FreeSpec with Matchers with BeforeAndAfterAll { |Host: x | |ABCDPATCH""" - }.toCharArray.map(_.toString).toSeq should rawMultiParseTo( - HttpRequest(PUT, "/resource/yes", List(Host("x")), "ABCD".getBytes)) + }.toCharArray.map(_.toString).toSeq should generalRawMultiParseTo( + Right(HttpRequest(PUT, "/resource/yes", List(Host("x")), "ABCD".getBytes)), + Left(MessageStartError(400, ErrorInfo("Illegal HTTP message start")))) closeAfterResponseCompletion shouldEqual Seq(false) } @@ -232,7 +233,7 @@ class RequestParserSpec extends FreeSpec with Matchers with BeforeAndAfterAll { val parser = newParser val result = multiParse(newParser)(Seq(prep(start + manyChunks))) val HttpEntity.Chunked(_, chunks) = result.head.right.get.req.entity - val strictChunks = chunks.collectAll.awaitResult(awaitAtMost) + val strictChunks = chunks.grouped(100000).runWith(Sink.head).awaitResult(awaitAtMost) strictChunks.size shouldEqual numChunks } } @@ -442,7 +443,7 @@ class RequestParserSpec extends FreeSpec with Matchers with BeforeAndAfterAll { def multiParse(parser: HttpRequestParser)(input: Seq[String]): Seq[Either[RequestOutput, StrictEqualHttpRequest]] = Source(input.toList) .map(ByteString.apply) - .section(name("parser"))(_.transform(() ⇒ parser)) + .section(name("parser"))(_.transform(() ⇒ parser.stage)) .splitWhen(x ⇒ x.isInstanceOf[MessageStart] || x.isInstanceOf[EntityStreamError]) .headAndTail .collect { @@ -461,7 +462,7 @@ class RequestParserSpec extends FreeSpec with Matchers with BeforeAndAfterAll { } .flatten(FlattenStrategy.concat) .map(strictEqualify) - .collectAll + .grouped(100000).runWith(Sink.head) .awaitResult(awaitAtMost) protected def parserSettings: ParserSettings = ParserSettings(system) @@ -474,7 +475,7 @@ class RequestParserSpec extends FreeSpec with Matchers with BeforeAndAfterAll { } private def compactEntityChunks(data: Source[ChunkStreamPart]): Future[Seq[ChunkStreamPart]] = - data.collectAll + data.grouped(100000).runWith(Sink.head) .fast.recover { case _: NoSuchElementException ⇒ Nil } def prep(response: String) = response.stripMarginWithNewline("\r\n") diff --git a/akka-http-core/src/test/scala/akka/http/engine/parsing/ResponseParserSpec.scala b/akka-http-core/src/test/scala/akka/http/engine/parsing/ResponseParserSpec.scala index 305e36422f..eb067d749d 100644 --- a/akka-http-core/src/test/scala/akka/http/engine/parsing/ResponseParserSpec.scala +++ b/akka-http-core/src/test/scala/akka/http/engine/parsing/ResponseParserSpec.scala @@ -261,7 +261,7 @@ class ResponseParserSpec extends FreeSpec with Matchers with BeforeAndAfterAll { val future = Source(input.toList) .map(ByteString.apply) - .section(name("parser"))(_.transform(() ⇒ newParser(requestMethod))) + .section(name("parser"))(_.transform(() ⇒ newParserStage(requestMethod))) .splitWhen(x ⇒ x.isInstanceOf[MessageStart] || x.isInstanceOf[EntityStreamError]) .headAndTail .collect { @@ -279,14 +279,16 @@ class ResponseParserSpec extends FreeSpec with Matchers with BeforeAndAfterAll { } .flatten(FlattenStrategy.concat) .map(strictEqualify) - .grouped(1000).runWith(Sink.head) + .grouped(100000).runWith(Sink.head) Await.result(future, 500.millis) } def parserSettings: ParserSettings = ParserSettings(system) - def newParser(requestMethod: HttpMethod = GET) = { - val parser = new HttpResponseParser(parserSettings, HttpHeaderParser(parserSettings)(), () ⇒ requestMethod) - parser + + def newParserStage(requestMethod: HttpMethod = GET) = { + val parser = new HttpResponseParser(parserSettings, HttpHeaderParser(parserSettings)()) + parser.setRequestMethodForNextResponse(requestMethod) + parser.stage } private def compactEntity(entity: ResponseEntity): Future[ResponseEntity] = @@ -296,7 +298,7 @@ class ResponseParserSpec extends FreeSpec with Matchers with BeforeAndAfterAll { } private def compactEntityChunks(data: Source[ChunkStreamPart]): Future[Source[ChunkStreamPart]] = - data.grouped(1000).runWith(Sink.head) + data.grouped(100000).runWith(Sink.head) .fast.map(source(_: _*)) .fast.recover { case _: NoSuchElementException ⇒ source() } diff --git a/akka-http-core/src/test/scala/akka/http/engine/server/HttpServerSpec.scala b/akka-http-core/src/test/scala/akka/http/engine/server/HttpServerSpec.scala index b94425def2..d189628e50 100644 --- a/akka-http-core/src/test/scala/akka/http/engine/server/HttpServerSpec.scala +++ b/akka-http-core/src/test/scala/akka/http/engine/server/HttpServerSpec.scala @@ -5,7 +5,7 @@ package akka.http.engine.server import scala.concurrent.duration._ -import org.scalatest.{ Inside, BeforeAndAfterAll, Matchers } +import org.scalatest.Inside import akka.event.NoLogging import akka.util.ByteString import akka.stream.scaladsl._ @@ -18,7 +18,7 @@ import HttpEntity._ import MediaTypes._ import HttpMethods._ -class HttpServerSpec extends AkkaSpec with Matchers with BeforeAndAfterAll with Inside { +class HttpServerSpec extends AkkaSpec("akka.loggers = []\n akka.loglevel = OFF") with Inside { implicit val materializer = FlowMaterializer() "The server implementation" should { diff --git a/akka-http-testkit/src/main/scala/akka/http/testkit/RouteTestResultComponent.scala b/akka-http-testkit/src/main/scala/akka/http/testkit/RouteTestResultComponent.scala index 373c864821..c7637f4dcb 100644 --- a/akka-http-testkit/src/main/scala/akka/http/testkit/RouteTestResultComponent.scala +++ b/akka-http-testkit/src/main/scala/akka/http/testkit/RouteTestResultComponent.scala @@ -96,6 +96,6 @@ trait RouteTestResultComponent { failTest("Request was neither completed nor rejected within " + timeout) private def awaitAllElements[T](data: Source[T]): immutable.Seq[T] = - data.collectAll.awaitResult(timeout) + data.grouped(100000).runWith(Sink.head).awaitResult(timeout) } } \ No newline at end of file diff --git a/akka-http-tests/src/test/scala/akka/http/server/directives/RangeDirectivesSpec.scala b/akka-http-tests/src/test/scala/akka/http/server/directives/RangeDirectivesSpec.scala index 8401186b83..2a10f0a06b 100644 --- a/akka-http-tests/src/test/scala/akka/http/server/directives/RangeDirectivesSpec.scala +++ b/akka-http-tests/src/test/scala/akka/http/server/directives/RangeDirectivesSpec.scala @@ -5,17 +5,16 @@ package akka.http.server package directives +import scala.concurrent.Await +import scala.concurrent.duration._ import akka.http.model.StatusCodes._ import akka.http.model._ import akka.http.model.headers._ import akka.http.util._ -import akka.stream.scaladsl.Source +import akka.stream.scaladsl.{ Sink, Source } import akka.util.ByteString import org.scalatest.{ Inside, Inspectors } -import scala.concurrent.Await -import scala.concurrent.duration._ - class RangeDirectivesSpec extends RoutingSpec with Inspectors with Inside { lazy val wrs = mapSettings(_.copy(rangeCountLimit = 10, rangeCoalescingThreshold = 1L)) & @@ -100,7 +99,7 @@ class RangeDirectivesSpec extends RoutingSpec with Inspectors with Inside { wrs { complete("Some random and not super short entity.") } } ~> check { header[`Content-Range`] should be(None) - val parts = Await.result(responseAs[Multipart.ByteRanges].parts.collectAll, 1.second) + val parts = Await.result(responseAs[Multipart.ByteRanges].parts.grouped(1000).runWith(Sink.head), 1.second) parts.size shouldEqual 2 inside(parts(0)) { case Multipart.ByteRanges.BodyPart(range, entity, unit, headers) ⇒ @@ -125,7 +124,7 @@ class RangeDirectivesSpec extends RoutingSpec with Inspectors with Inside { wrs { complete(HttpEntity.Default(MediaTypes.`text/plain`, content.length, entityData())) } } ~> check { header[`Content-Range`] should be(None) - val parts = Await.result(responseAs[Multipart.ByteRanges].parts.collectAll, 1.second) + val parts = Await.result(responseAs[Multipart.ByteRanges].parts.grouped(1000).runWith(Sink.head), 1.second) parts.size shouldEqual 2 } } From 0f0ce513fff77f8b4da77cf34708d7d837cc6f38 Mon Sep 17 00:00:00 2001 From: Mathias Date: Fri, 19 Dec 2014 14:42:37 +0100 Subject: [PATCH 07/10] =htc small improvement in BodyPartParser --- .../http/engine/parsing/BodyPartParser.scala | 20 ++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/akka-http-core/src/main/scala/akka/http/engine/parsing/BodyPartParser.scala b/akka-http-core/src/main/scala/akka/http/engine/parsing/BodyPartParser.scala index 03c2259b10..36d5a7843d 100644 --- a/akka-http-core/src/main/scala/akka/http/engine/parsing/BodyPartParser.scala +++ b/akka-http-core/src/main/scala/akka/http/engine/parsing/BodyPartParser.scala @@ -5,7 +5,6 @@ package akka.http.engine.parsing import scala.annotation.tailrec -import scala.collection.mutable.ListBuffer import akka.event.LoggingAdapter import akka.parboiled2.CharPredicate import akka.stream.scaladsl.Source @@ -55,8 +54,7 @@ private[http] final class BodyPartParser(defaultContentType: ContentType, if (illegalHeaderWarnings) log.warning(errorInfo.withSummaryPrepended("Illegal multipart header").formatPretty) } - private[this] val result = new ListBuffer[Output] // transformer op is currently optimized for LinearSeqs - private[this] var resultIterator: Iterator[Output] = Iterator.empty + private[this] var output = collection.immutable.Queue.empty[Output] private[this] var state: ByteString ⇒ StateResult = tryParseInitialBoundary private[this] var receivedInitialBoundary = false private[this] var terminated = false @@ -65,7 +63,6 @@ private[http] final class BodyPartParser(defaultContentType: ContentType, if (illegalHeaderWarnings) log.warning(errorInfo.withSummaryPrepended("Illegal multipart header").formatPretty) override def onPush(input: ByteString, ctx: Context[Output]): Directive = { - result.clear() try state(input) catch { case e: ParsingException ⇒ fail(e.info) @@ -73,15 +70,14 @@ private[http] final class BodyPartParser(defaultContentType: ContentType, // we are missing a try/catch{continue} wrapper somewhere throw new IllegalStateException("unexpected NotEnoughDataException", NotEnoughDataException) } - resultIterator = result.iterator - if (resultIterator.hasNext) ctx.push(resultIterator.next()) + if (output.nonEmpty) ctx.push(dequeue()) else if (!terminated) ctx.pull() else ctx.finish() } override def onPull(ctx: Context[Output]): Directive = { - if (resultIterator.hasNext) - ctx.push(resultIterator.next()) + if (output.nonEmpty) + ctx.push(dequeue()) else if (ctx.isFinishing) { if (terminated || !receivedInitialBoundary) ctx.finish() @@ -203,7 +199,13 @@ private[http] final class BodyPartParser(defaultContentType: ContentType, def emit(bytes: ByteString): Unit = if (bytes.nonEmpty) emit(EntityPart(bytes)) - def emit(output: Output): Unit = result += output + def emit(element: Output): Unit = output = output.enqueue(element) + + def dequeue(): Output = { + val head = output.head + output = output.tail + head + } def continue(input: ByteString, offset: Int)(next: (ByteString, Int) ⇒ StateResult): StateResult = { state = From 166be32ce6f36734626b05c530e908455d5d6c30 Mon Sep 17 00:00:00 2001 From: Mathias Date: Fri, 19 Dec 2014 14:48:00 +0100 Subject: [PATCH 08/10] =htc #16574 fix large requests not being consumable on the server-side --- .../scala/akka/http/engine/server/HttpServer.scala | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/akka-http-core/src/main/scala/akka/http/engine/server/HttpServer.scala b/akka-http-core/src/main/scala/akka/http/engine/server/HttpServer.scala index ddb57ee22a..6f1a0d6288 100644 --- a/akka-http-core/src/main/scala/akka/http/engine/server/HttpServer.scala +++ b/akka-http-core/src/main/scala/akka/http/engine/server/HttpServer.scala @@ -67,6 +67,14 @@ private[http] object HttpServer { HttpRequest(effectiveMethod, effectiveUri, headers, createEntity(entityParts), protocol) } + // we need to make sure that only one element per incoming request is queueing up in front of + // the bypassMerge.bypassInput. Otherwise the rising backpressure against the bypassFanout + // would eventually prevent us from reading the remaining request chunks from the transportIn + val bypass = Flow[RequestOutput].filter { + case (_: RequestStart | _: MessageStartError) ⇒ true + case _ ⇒ false + } + val rendererPipeline = Flow[ResponseRenderingContext] .section(name("recover"))(_.transform(() ⇒ new ErrorsTo500ResponseRecovery(log))) // FIXME: simplify after #16394 is closed @@ -82,7 +90,7 @@ private[http] object HttpServer { Flow() { implicit b ⇒ //FIXME: the graph is unnecessary after fixing #15957 transportIn ~> requestParsing ~> bypassFanout ~> requestPreparation ~> serverFlow ~> bypassMerge.applicationInput ~> rendererPipeline ~> transportOut - bypassFanout ~> bypassMerge.bypassInput + bypassFanout ~> bypass ~> bypassMerge.bypassInput oneHundredContinueSource ~> bypassMerge.oneHundredContinueInput b.allowCycles() @@ -107,7 +115,7 @@ private[http] object HttpServer { override val initialState = State[Any](Read(bypassInput)) { case (ctx, _, requestStart: RequestStart) ⇒ waitingForApplicationResponse(requestStart) case (ctx, _, MessageStartError(status, info)) ⇒ finishWithError(ctx, "request", status, info) - case _ ⇒ SameState // drop other parser output + case _ ⇒ throw new IllegalStateException } def waitingForApplicationResponse(requestStart: RequestStart): State[Any] = From 35d1855e0848acf4f18eb5e06e0e6d13fa82ec20 Mon Sep 17 00:00:00 2001 From: Mathias Date: Fri, 19 Dec 2014 15:24:18 +0100 Subject: [PATCH 09/10] =htc make server accept early incoming request stream close This is the fix for #16510 on the server-side. --- .../akka/http/engine/server/HttpServer.scala | 27 ++++++++++++------- 1 file changed, 18 insertions(+), 9 deletions(-) diff --git a/akka-http-core/src/main/scala/akka/http/engine/server/HttpServer.scala b/akka-http-core/src/main/scala/akka/http/engine/server/HttpServer.scala index 6f1a0d6288..0b70c0a281 100644 --- a/akka-http-core/src/main/scala/akka/http/engine/server/HttpServer.scala +++ b/akka-http-core/src/main/scala/akka/http/engine/server/HttpServer.scala @@ -107,18 +107,25 @@ private[http] object HttpServer { val applicationInput = createInputPort[HttpResponse]() def createMergeLogic() = new MergeLogic[ResponseRenderingContext] { + var requestStart: RequestStart = _ + override def inputHandles(inputCount: Int) = { require(inputCount == 3, s"BypassMerge must have 3 connected inputs, was $inputCount") Vector(bypassInput, oneHundredContinueInput, applicationInput) } - override val initialState = State[Any](Read(bypassInput)) { - case (ctx, _, requestStart: RequestStart) ⇒ waitingForApplicationResponse(requestStart) + override val initialState: State[Any] = State[Any](Read(bypassInput)) { + case (ctx, _, requestStart: RequestStart) ⇒ + this.requestStart = requestStart + ctx.changeCompletionHandling(waitingForApplicationResponseCompletionHandling) + waitingForApplicationResponse case (ctx, _, MessageStartError(status, info)) ⇒ finishWithError(ctx, "request", status, info) case _ ⇒ throw new IllegalStateException } - def waitingForApplicationResponse(requestStart: RequestStart): State[Any] = + override val initialCompletionHandling = eagerClose + + val waitingForApplicationResponse = State[Any](ReadAny(oneHundredContinueInput, applicationInput)) { case (ctx, _, response: HttpResponse) ⇒ // see the comment on [[OneHundredContinue]] for an explanation of the closing logic here (and more) @@ -129,18 +136,20 @@ private[http] object HttpServer { case (ctx, _, OneHundredContinue) ⇒ assert(requestStart.expect100ContinueResponsePending) ctx.emit(ResponseRenderingContext(HttpResponse(StatusCodes.Continue))) - waitingForApplicationResponse(requestStart.copy(expect100ContinueResponsePending = false)) + requestStart = requestStart.copy(expect100ContinueResponsePending = false) + SameState } - override def initialCompletionHandling = CompletionHandling( - onComplete = (ctx, _) ⇒ { ctx.complete(); SameState }, + val waitingForApplicationResponseCompletionHandling = CompletionHandling( + onComplete = { + case (ctx, `bypassInput`) ⇒ { requestStart = requestStart.copy(closeAfterResponseCompletion = true); SameState } + case (ctx, _) ⇒ { ctx.complete(); SameState } + }, onError = { case (ctx, _, EntityStreamException(errorInfo)) ⇒ // the application has forwarded a request entity stream error to the response stream finishWithError(ctx, "request", StatusCodes.BadRequest, errorInfo) - case (ctx, _, error) ⇒ - ctx.error(error) - SameState + case (ctx, _, error) ⇒ { ctx.error(error); SameState } }) def finishWithError(ctx: MergeLogicContext, target: String, status: StatusCode, info: ErrorInfo): State[Any] = { From 03362052dd239dfc3c0e634fd0a9edfb63ab4dfa Mon Sep 17 00:00:00 2001 From: Mathias Date: Fri, 19 Dec 2014 16:52:53 +0100 Subject: [PATCH 10/10] =htc add one more test checking against #16574 --- .../akka/http/engine/server/HttpServer.scala | 1 + .../http/engine/server/HttpServerSpec.scala | 43 ++++++++++++++++++- 2 files changed, 43 insertions(+), 1 deletion(-) diff --git a/akka-http-core/src/main/scala/akka/http/engine/server/HttpServer.scala b/akka-http-core/src/main/scala/akka/http/engine/server/HttpServer.scala index 0b70c0a281..7b21b1cc6e 100644 --- a/akka-http-core/src/main/scala/akka/http/engine/server/HttpServer.scala +++ b/akka-http-core/src/main/scala/akka/http/engine/server/HttpServer.scala @@ -66,6 +66,7 @@ private[http] object HttpServer { val effectiveMethod = if (method == HttpMethods.HEAD && settings.transparentHeadRequests) HttpMethods.GET else method HttpRequest(effectiveMethod, effectiveUri, headers, createEntity(entityParts), protocol) } + .take(Int.MaxValue) // FIXME: removing this makes the akka.http.engine.server.HttpServerSpec fail! // we need to make sure that only one element per incoming request is queueing up in front of // the bypassMerge.bypassInput. Otherwise the rising backpressure against the bypassFanout diff --git a/akka-http-core/src/test/scala/akka/http/engine/server/HttpServerSpec.scala b/akka-http-core/src/test/scala/akka/http/engine/server/HttpServerSpec.scala index d189628e50..8b1fca0fe6 100644 --- a/akka-http-core/src/test/scala/akka/http/engine/server/HttpServerSpec.scala +++ b/akka-http-core/src/test/scala/akka/http/engine/server/HttpServerSpec.scala @@ -4,6 +4,8 @@ package akka.http.engine.server +import scala.util.Random +import scala.annotation.tailrec import scala.concurrent.duration._ import org.scalatest.Inside import akka.event.NoLogging @@ -607,6 +609,45 @@ class HttpServerSpec extends AkkaSpec("akka.loggers = []\n akka.loglevel = OFF") | |""".stripMarginWithNewline("\r\n") } + + "correctly consume and render large requests and responses" in new TestSetup { + send("""POST / HTTP/1.1 + |Host: example.com + |Content-Length: 100000 + | + |""".stripMarginWithNewline("\r\n")) + + val HttpRequest(POST, _, _, entity, _) = expectRequest + responsesSub.expectRequest() + responsesSub.sendNext(HttpResponse(entity = entity)) + responsesSub.sendComplete() + + netOutSub.request(1) + wipeDate(netOut.expectNext().utf8String) shouldEqual + """HTTP/1.1 200 OK + |Server: akka-http/test + |Date: XXXX + |Content-Type: application/octet-stream + |Content-Length: 100000 + | + |""".stripMarginWithNewline("\r\n") + + val random = new Random() + @tailrec def rec(bytesLeft: Int): Unit = + if (bytesLeft > 0) { + val count = math.min(random.nextInt(1000) + 1, bytesLeft) + val data = random.alphanumeric.take(count).mkString + send(data) + netOutSub.request(1) + netOut.expectNext().utf8String shouldEqual data + rec(bytesLeft - count) + } + rec(100000) + + netInSub.sendComplete() + requests.expectComplete() + netOut.expectComplete() + } } class TestSetup { @@ -641,7 +682,7 @@ class HttpServerSpec extends AkkaSpec("akka.loggers = []\n akka.loglevel = OFF") def expectNoRequest(max: FiniteDuration): Unit = requests.expectNoMsg(max) def send(data: ByteString): Unit = netInSub.sendNext(data) - def send(data: String): Unit = send(ByteString(data, "ASCII")) + def send(data: String): Unit = send(ByteString(data, "UTF8")) def closeNetworkInput(): Unit = netInSub.sendComplete() }