!htp #19528 HttpServerBluePrint: close connection on request entity cancellation

This commit is contained in:
Anton Karamanov 2016-02-05 11:23:57 +03:00 committed by Anton Karamanov
parent 6ef9ab3276
commit af3fc0c9a6
10 changed files with 185 additions and 9 deletions

View file

@ -4,9 +4,9 @@
package akka.actor.dungeon
import akka.dispatch.sysmsg.{Unwatch, Watch, DeathWatchNotification}
import akka.event.Logging.{Warning, Debug}
import akka.actor.{InternalActorRef, Address, Terminated, Actor, ActorRefScope, ActorCell, ActorRef, MinimalActorRef}
import akka.dispatch.sysmsg.{ Unwatch, Watch, DeathWatchNotification }
import akka.event.Logging.{ Warning, Debug }
import akka.actor.{ InternalActorRef, Address, Terminated, Actor, ActorRefScope, ActorCell, ActorRef, MinimalActorRef }
import akka.event.AddressTerminatedTopic
private[akka] trait DeathWatch { this: ActorCell

View file

@ -53,6 +53,10 @@ The connection can also be closed by the server.
An application can actively trigger the closing of the connection by completing the request stream. In this case the
underlying TCP connection will be closed when the last pending response has been received.
The connection will also be closed if the response entity is cancelled (e.g. by attaching it to ``Sink.cancelled()``)
or consumed only partially (e.g. by using ``take`` combinator). In order to prevent this behaviour the entity should be
explicitly drained by attaching it to ``Sink.ignore()``.
Timeouts
--------

View file

@ -130,6 +130,10 @@ connection. An often times more convenient alternative is to explicitly add a ``
``HttpResponse``. This response will then be the last one on the connection and the server will actively close the
connection when it has been sent out.
Connection will also be closed if request entity has been cancelled (e.g. by attaching it to ``Sink.cancelled()``)
or consumed only partially (e.g. by using ``take`` combinator). In order to prevent this behaviour entity should be
explicitly drained by attaching it to ``Sink.ignore()``.
.. _serverSideHTTPS-java:

View file

@ -156,3 +156,17 @@ Routing settings parameter name
and were accessible via ``settings``. We now made it possible to configure the parsers
settings as well, so ``RoutingSettings`` is now ``routingSettings`` and ``ParserSettings`` is
now accessible via ``parserSettings``.
Client / server behaviour on cancelled entity
---------------------------------------------
Previously if request or response were cancelled or consumed only partially
(e.g. by using ``take`` combinator) the remaining data was silently drained to prevent stalling
the connection, since there could still be more requests / responses incoming. Now the default
behaviour is to close the connection in order to prevent using excessive resource usage in case
of huge entities.
The old behaviour can be achieved by explicitly draining the entity:
response.entity().getDataBytes().runWith(Sink.ignore())

View file

@ -55,6 +55,10 @@ The connection can also be closed by the server.
An application can actively trigger the closing of the connection by completing the request stream. In this case the
underlying TCP connection will be closed when the last pending response has been received.
The connection will also be closed if the response entity is cancelled (e.g. by attaching it to ``Sink.cancelled``)
or consumed only partially (e.g. by using ``take`` combinator). In order to prevent this behaviour the entity should be
explicitly drained by attaching it to ``Sink.ignore``.
Timeouts
--------

View file

@ -132,6 +132,10 @@ connection. An often times more convenient alternative is to explicitly add a ``
``HttpResponse``. This response will then be the last one on the connection and the server will actively close the
connection when it has been sent out.
Connection will also be closed if request entity has been cancelled (e.g. by attaching it to ``Sink.cancelled``)
or consumed only partially (e.g. by using ``take`` combinator). In order to prevent this behaviour entity should be
explicitly drained by attaching it to ``Sink.ignore``.
.. _serverSideHTTPS:

View file

@ -102,6 +102,19 @@ and were accessible via ``settings``. We now made it possible to configure the p
settings as well, so ``RoutingSettings`` is now ``routingSettings`` and ``ParserSettings`` is
now accessible via ``parserSettings``.
Client / server behaviour on cancelled entity
---------------------------------------------
Previously if request or response were cancelled or consumed only partially
(e.g. by using ``take`` combinator) the remaining data was silently drained to prevent stalling
the connection, since there could still be more requests / responses incoming. Now the default
behaviour is to close the connection in order to prevent using excessive resource usage in case
of huge entities.
The old behaviour can be achieved by explicitly draining the entity:
response.entity.dataBytes.runWith(Sink.ignore)
Changed Sources / Sinks
=======================

View file

@ -104,12 +104,23 @@ private[http] object HttpServerBluePrint {
val remoteAddress = inheritedAttributes.get[HttpAttributes.RemoteAddress].flatMap(_.address)
var downstreamPullWaiting = false
var completionDeferred = false
var entitySource: SubSourceOutlet[RequestOutput] = _
// optimization: to avoid allocations the "idle" case in and out handlers are put directly on the GraphStageLogic itself
override def onPull(): Unit = {
pull(in)
}
// optimization: this callback is used to handle entity substream cancellation to avoid allocating a dedicated handler
override def onDownstreamFinish(): Unit = {
if (entitySource ne null) {
// application layer has cancelled or only partially consumed response entity:
// connection will be closed
entitySource.complete()
completeStage()
}
}
override def onPush(): Unit = grab(in) match {
case RequestStart(method, uri, protocol, hdrs, entityCreator, _, _)
val effectiveMethod = if (method == HttpMethods.HEAD && settings.transparentHeadRequests) HttpMethods.GET else method
@ -126,7 +137,7 @@ private[http] object HttpServerBluePrint {
setIdleHandlers()
def setIdleHandlers() {
def setIdleHandlers(): Unit = {
if (completionDeferred) {
completeStage()
} else {
@ -150,15 +161,17 @@ private[http] object HttpServerBluePrint {
// stream incoming chunks into the request entity until we reach the end of it
// and then toggle back to "idle"
val entitySource = new SubSourceOutlet[RequestOutput]("EntitySource")
entitySource = new SubSourceOutlet[RequestOutput]("EntitySource")
// optimization: re-use the idle outHandler
entitySource.setHandler(this)
setHandler(in, new InHandler {
// optimization: handlers are combined to reduce allocations
val chunkedRequestHandler = new InHandler with OutHandler {
def onPush(): Unit = {
grab(in) match {
case MessageEnd
entitySource.complete()
entitySource = null
setIdleHandlers()
case x entitySource.push(x)
@ -172,8 +185,6 @@ private[http] object HttpServerBluePrint {
entitySource.fail(ex)
failStage(ex)
}
})
setHandler(out, new OutHandler {
override def onPull(): Unit = {
// remember this until we are done with the chunked entity
// so can pull downstream then
@ -185,7 +196,10 @@ private[http] object HttpServerBluePrint {
// when it completes complete the stage
completionDeferred = true
}
})
}
setHandler(in, chunkedRequestHandler)
setHandler(out, chunkedRequestHandler)
creator(Source.fromGraph(entitySource.source))
}

View file

@ -7,6 +7,7 @@ package akka.http.impl.engine.client
import scala.concurrent.duration._
import scala.reflect.ClassTag
import org.scalatest.Inside
import org.scalatest.concurrent.ScalaFutures
import akka.http.scaladsl.settings.ClientConnectionSettings
import akka.stream.io.{ SessionBytes, SslTlsOutbound, SendBytes }
import akka.util.ByteString
@ -137,6 +138,76 @@ class LowLevelOutgoingConnectionSpec extends AkkaSpec("akka.loggers = []\n akka.
}
}
"close the connection if response entity stream has been cancelled" in new TestSetup {
// two requests are sent in order to make sure that connection
// isn't immediately closed after the first one by the server
requestsSub.sendNext(HttpRequest())
requestsSub.sendNext(HttpRequest())
requestsSub.sendComplete()
expectWireData(
"""GET / HTTP/1.1
|Host: example.com
|User-Agent: akka-http/test
|
|""")
// two chunks sent by server
sendWireData(
"""HTTP/1.1 200 OK
|Transfer-Encoding: chunked
|
|6
|abcdef
|6
|abcdef
|0
|
|""")
inside(expectResponse()) {
case HttpResponse(StatusCodes.OK, _, HttpEntity.Chunked(_, data), _) =>
val dataProbe = TestSubscriber.manualProbe[ChunkStreamPart]
// but only one consumed by server
data.take(1).to(Sink.fromSubscriber(dataProbe)).run()
val sub = dataProbe.expectSubscription()
sub.request(1)
dataProbe.expectNext(Chunk(ByteString("abcdef")))
dataProbe.expectComplete()
// connection is closed once requested elements are consumed
netInSub.expectCancellation()
}
}
"proceed to next response once previous response's entity has been drained" in new TestSetup with ScalaFutures {
def twice(action: => Unit): Unit = { action; action }
twice {
requestsSub.sendNext(HttpRequest())
expectWireData(
"""GET / HTTP/1.1
|Host: example.com
|User-Agent: akka-http/test
|
|""")
sendWireData(
"""HTTP/1.1 200 OK
|Transfer-Encoding: chunked
|
|6
|abcdef
|0
|
|""")
val whenComplete = expectResponse().entity.dataBytes.runWith(Sink.ignore)
whenComplete.futureValue should be (akka.Done)
}
}
"handle several requests on one persistent connection" which {
"has a first response that was chunked" in new TestSetup {
requestsSub.sendNext(HttpRequest())

View file

@ -11,6 +11,7 @@ import scala.util.Random
import scala.annotation.tailrec
import scala.concurrent.duration._
import org.scalatest.Inside
import org.scalatest.concurrent.ScalaFutures
import akka.util.ByteString
import akka.stream.scaladsl._
import akka.stream.ActorMaterializer
@ -325,6 +326,53 @@ class HttpServerSpec extends AkkaSpec(
}
}
"close the connection if request entity stream has been cancelled" in new TestSetup {
// two chunks sent by client
send("""POST / HTTP/1.1
|Host: example.com
|Transfer-Encoding: chunked
|
|6
|abcdef
|6
|abcdef
|0
|
|""")
inside(expectRequest()) {
case HttpRequest(POST, _, _, HttpEntity.Chunked(_, data), _)
val dataProbe = TestSubscriber.manualProbe[ChunkStreamPart]
// but only one consumed by server
data.take(1).to(Sink.fromSubscriber(dataProbe)).run()
val sub = dataProbe.expectSubscription()
sub.request(1)
dataProbe.expectNext(Chunk(ByteString("abcdef")))
dataProbe.expectComplete()
// connection closes once requested elements are consumed
netIn.expectCancellation()
}
}
"proceed to next request once previous request's entity has beed drained" in new TestSetup with ScalaFutures {
def twice(action: => Unit): Unit = { action; action }
twice {
send("""POST / HTTP/1.1
|Host: example.com
|Transfer-Encoding: chunked
|
|6
|abcdef
|0
|
|""")
val whenComplete = expectRequest().entity.dataBytes.runWith(Sink.ignore)
whenComplete.futureValue should be (akka.Done)
}
}
"report a truncated entity stream on the entity data stream and the main stream for a Default entity" in new TestSetup {
send("""POST / HTTP/1.1
|Host: example.com