From af3fc0c9a62aa0892d396941a5886cc3388316f3 Mon Sep 17 00:00:00 2001 From: Anton Karamanov Date: Fri, 5 Feb 2016 11:23:57 +0300 Subject: [PATCH] !htp #19528 HttpServerBluePrint: close connection on request entity cancellation --- .../scala/akka/actor/dungeon/DeathWatch.scala | 6 +- .../http/client-side/connection-level.rst | 4 ++ .../server-side/low-level-server-side-api.rst | 4 ++ .../stream/migration-guide-2.0-2.4-java.rst | 14 ++++ .../http/client-side/connection-level.rst | 4 ++ .../scala/http/low-level-server-side-api.rst | 4 ++ .../stream/migration-guide-2.0-2.4-scala.rst | 13 ++++ .../engine/server/HttpServerBluePrint.scala | 26 +++++-- .../LowLevelOutgoingConnectionSpec.scala | 71 +++++++++++++++++++ .../impl/engine/server/HttpServerSpec.scala | 48 +++++++++++++ 10 files changed, 185 insertions(+), 9 deletions(-) diff --git a/akka-actor/src/main/scala/akka/actor/dungeon/DeathWatch.scala b/akka-actor/src/main/scala/akka/actor/dungeon/DeathWatch.scala index dbeaf3a7a6..cf49c7f157 100644 --- a/akka-actor/src/main/scala/akka/actor/dungeon/DeathWatch.scala +++ b/akka-actor/src/main/scala/akka/actor/dungeon/DeathWatch.scala @@ -4,9 +4,9 @@ package akka.actor.dungeon -import akka.dispatch.sysmsg.{Unwatch, Watch, DeathWatchNotification} -import akka.event.Logging.{Warning, Debug} -import akka.actor.{InternalActorRef, Address, Terminated, Actor, ActorRefScope, ActorCell, ActorRef, MinimalActorRef} +import akka.dispatch.sysmsg.{ Unwatch, Watch, DeathWatchNotification } +import akka.event.Logging.{ Warning, Debug } +import akka.actor.{ InternalActorRef, Address, Terminated, Actor, ActorRefScope, ActorCell, ActorRef, MinimalActorRef } import akka.event.AddressTerminatedTopic private[akka] trait DeathWatch { this: ActorCell ⇒ diff --git a/akka-docs/rst/java/http/client-side/connection-level.rst b/akka-docs/rst/java/http/client-side/connection-level.rst index 36eec3c7bc..eb2ff3c021 100644 --- a/akka-docs/rst/java/http/client-side/connection-level.rst +++ b/akka-docs/rst/java/http/client-side/connection-level.rst @@ -53,6 +53,10 @@ The connection can also be closed by the server. An application can actively trigger the closing of the connection by completing the request stream. In this case the underlying TCP connection will be closed when the last pending response has been received. +The connection will also be closed if the response entity is cancelled (e.g. by attaching it to ``Sink.cancelled()``) +or consumed only partially (e.g. by using ``take`` combinator). In order to prevent this behaviour the entity should be +explicitly drained by attaching it to ``Sink.ignore()``. + Timeouts -------- diff --git a/akka-docs/rst/java/http/server-side/low-level-server-side-api.rst b/akka-docs/rst/java/http/server-side/low-level-server-side-api.rst index f13ebd2647..ec883e5e26 100644 --- a/akka-docs/rst/java/http/server-side/low-level-server-side-api.rst +++ b/akka-docs/rst/java/http/server-side/low-level-server-side-api.rst @@ -130,6 +130,10 @@ connection. An often times more convenient alternative is to explicitly add a `` ``HttpResponse``. This response will then be the last one on the connection and the server will actively close the connection when it has been sent out. +Connection will also be closed if request entity has been cancelled (e.g. by attaching it to ``Sink.cancelled()``) +or consumed only partially (e.g. by using ``take`` combinator). In order to prevent this behaviour entity should be +explicitly drained by attaching it to ``Sink.ignore()``. + .. _serverSideHTTPS-java: diff --git a/akka-docs/rst/java/stream/migration-guide-2.0-2.4-java.rst b/akka-docs/rst/java/stream/migration-guide-2.0-2.4-java.rst index fcf3156e04..89206a3128 100644 --- a/akka-docs/rst/java/stream/migration-guide-2.0-2.4-java.rst +++ b/akka-docs/rst/java/stream/migration-guide-2.0-2.4-java.rst @@ -156,3 +156,17 @@ Routing settings parameter name and were accessible via ``settings``. We now made it possible to configure the parsers settings as well, so ``RoutingSettings`` is now ``routingSettings`` and ``ParserSettings`` is now accessible via ``parserSettings``. + +Client / server behaviour on cancelled entity +--------------------------------------------- + +Previously if request or response were cancelled or consumed only partially +(e.g. by using ``take`` combinator) the remaining data was silently drained to prevent stalling +the connection, since there could still be more requests / responses incoming. Now the default +behaviour is to close the connection in order to prevent using excessive resource usage in case +of huge entities. + +The old behaviour can be achieved by explicitly draining the entity: + + response.entity().getDataBytes().runWith(Sink.ignore()) + diff --git a/akka-docs/rst/scala/http/client-side/connection-level.rst b/akka-docs/rst/scala/http/client-side/connection-level.rst index 452a633d20..ec98c22686 100644 --- a/akka-docs/rst/scala/http/client-side/connection-level.rst +++ b/akka-docs/rst/scala/http/client-side/connection-level.rst @@ -55,6 +55,10 @@ The connection can also be closed by the server. An application can actively trigger the closing of the connection by completing the request stream. In this case the underlying TCP connection will be closed when the last pending response has been received. +The connection will also be closed if the response entity is cancelled (e.g. by attaching it to ``Sink.cancelled``) +or consumed only partially (e.g. by using ``take`` combinator). In order to prevent this behaviour the entity should be +explicitly drained by attaching it to ``Sink.ignore``. + Timeouts -------- diff --git a/akka-docs/rst/scala/http/low-level-server-side-api.rst b/akka-docs/rst/scala/http/low-level-server-side-api.rst index b4c2156b44..91b7faadd2 100644 --- a/akka-docs/rst/scala/http/low-level-server-side-api.rst +++ b/akka-docs/rst/scala/http/low-level-server-side-api.rst @@ -132,6 +132,10 @@ connection. An often times more convenient alternative is to explicitly add a `` ``HttpResponse``. This response will then be the last one on the connection and the server will actively close the connection when it has been sent out. +Connection will also be closed if request entity has been cancelled (e.g. by attaching it to ``Sink.cancelled``) +or consumed only partially (e.g. by using ``take`` combinator). In order to prevent this behaviour entity should be +explicitly drained by attaching it to ``Sink.ignore``. + .. _serverSideHTTPS: diff --git a/akka-docs/rst/scala/stream/migration-guide-2.0-2.4-scala.rst b/akka-docs/rst/scala/stream/migration-guide-2.0-2.4-scala.rst index 69ed312d23..6658f28540 100644 --- a/akka-docs/rst/scala/stream/migration-guide-2.0-2.4-scala.rst +++ b/akka-docs/rst/scala/stream/migration-guide-2.0-2.4-scala.rst @@ -102,6 +102,19 @@ and were accessible via ``settings``. We now made it possible to configure the p settings as well, so ``RoutingSettings`` is now ``routingSettings`` and ``ParserSettings`` is now accessible via ``parserSettings``. +Client / server behaviour on cancelled entity +--------------------------------------------- + +Previously if request or response were cancelled or consumed only partially +(e.g. by using ``take`` combinator) the remaining data was silently drained to prevent stalling +the connection, since there could still be more requests / responses incoming. Now the default +behaviour is to close the connection in order to prevent using excessive resource usage in case +of huge entities. + +The old behaviour can be achieved by explicitly draining the entity: + + response.entity.dataBytes.runWith(Sink.ignore) + Changed Sources / Sinks ======================= diff --git a/akka-http-core/src/main/scala/akka/http/impl/engine/server/HttpServerBluePrint.scala b/akka-http-core/src/main/scala/akka/http/impl/engine/server/HttpServerBluePrint.scala index 2da12a0ec1..679cde6793 100644 --- a/akka-http-core/src/main/scala/akka/http/impl/engine/server/HttpServerBluePrint.scala +++ b/akka-http-core/src/main/scala/akka/http/impl/engine/server/HttpServerBluePrint.scala @@ -104,12 +104,23 @@ private[http] object HttpServerBluePrint { val remoteAddress = inheritedAttributes.get[HttpAttributes.RemoteAddress].flatMap(_.address) var downstreamPullWaiting = false var completionDeferred = false + var entitySource: SubSourceOutlet[RequestOutput] = _ // optimization: to avoid allocations the "idle" case in and out handlers are put directly on the GraphStageLogic itself override def onPull(): Unit = { pull(in) } + // optimization: this callback is used to handle entity substream cancellation to avoid allocating a dedicated handler + override def onDownstreamFinish(): Unit = { + if (entitySource ne null) { + // application layer has cancelled or only partially consumed response entity: + // connection will be closed + entitySource.complete() + completeStage() + } + } + override def onPush(): Unit = grab(in) match { case RequestStart(method, uri, protocol, hdrs, entityCreator, _, _) ⇒ val effectiveMethod = if (method == HttpMethods.HEAD && settings.transparentHeadRequests) HttpMethods.GET else method @@ -126,7 +137,7 @@ private[http] object HttpServerBluePrint { setIdleHandlers() - def setIdleHandlers() { + def setIdleHandlers(): Unit = { if (completionDeferred) { completeStage() } else { @@ -150,15 +161,17 @@ private[http] object HttpServerBluePrint { // stream incoming chunks into the request entity until we reach the end of it // and then toggle back to "idle" - val entitySource = new SubSourceOutlet[RequestOutput]("EntitySource") + entitySource = new SubSourceOutlet[RequestOutput]("EntitySource") // optimization: re-use the idle outHandler entitySource.setHandler(this) - setHandler(in, new InHandler { + // optimization: handlers are combined to reduce allocations + val chunkedRequestHandler = new InHandler with OutHandler { def onPush(): Unit = { grab(in) match { case MessageEnd ⇒ entitySource.complete() + entitySource = null setIdleHandlers() case x ⇒ entitySource.push(x) @@ -172,8 +185,6 @@ private[http] object HttpServerBluePrint { entitySource.fail(ex) failStage(ex) } - }) - setHandler(out, new OutHandler { override def onPull(): Unit = { // remember this until we are done with the chunked entity // so can pull downstream then @@ -185,7 +196,10 @@ private[http] object HttpServerBluePrint { // when it completes complete the stage completionDeferred = true } - }) + } + + setHandler(in, chunkedRequestHandler) + setHandler(out, chunkedRequestHandler) creator(Source.fromGraph(entitySource.source)) } diff --git a/akka-http-core/src/test/scala/akka/http/impl/engine/client/LowLevelOutgoingConnectionSpec.scala b/akka-http-core/src/test/scala/akka/http/impl/engine/client/LowLevelOutgoingConnectionSpec.scala index 237e57b45d..01ba290723 100644 --- a/akka-http-core/src/test/scala/akka/http/impl/engine/client/LowLevelOutgoingConnectionSpec.scala +++ b/akka-http-core/src/test/scala/akka/http/impl/engine/client/LowLevelOutgoingConnectionSpec.scala @@ -7,6 +7,7 @@ package akka.http.impl.engine.client import scala.concurrent.duration._ import scala.reflect.ClassTag import org.scalatest.Inside +import org.scalatest.concurrent.ScalaFutures import akka.http.scaladsl.settings.ClientConnectionSettings import akka.stream.io.{ SessionBytes, SslTlsOutbound, SendBytes } import akka.util.ByteString @@ -137,6 +138,76 @@ class LowLevelOutgoingConnectionSpec extends AkkaSpec("akka.loggers = []\n akka. } } + "close the connection if response entity stream has been cancelled" in new TestSetup { + // two requests are sent in order to make sure that connection + // isn't immediately closed after the first one by the server + requestsSub.sendNext(HttpRequest()) + requestsSub.sendNext(HttpRequest()) + requestsSub.sendComplete() + + expectWireData( + """GET / HTTP/1.1 + |Host: example.com + |User-Agent: akka-http/test + | + |""") + + // two chunks sent by server + sendWireData( + """HTTP/1.1 200 OK + |Transfer-Encoding: chunked + | + |6 + |abcdef + |6 + |abcdef + |0 + | + |""") + + inside(expectResponse()) { + case HttpResponse(StatusCodes.OK, _, HttpEntity.Chunked(_, data), _) => + val dataProbe = TestSubscriber.manualProbe[ChunkStreamPart] + // but only one consumed by server + data.take(1).to(Sink.fromSubscriber(dataProbe)).run() + val sub = dataProbe.expectSubscription() + sub.request(1) + dataProbe.expectNext(Chunk(ByteString("abcdef"))) + dataProbe.expectComplete() + // connection is closed once requested elements are consumed + netInSub.expectCancellation() + } + } + + "proceed to next response once previous response's entity has been drained" in new TestSetup with ScalaFutures { + def twice(action: => Unit): Unit = { action; action } + + twice { + requestsSub.sendNext(HttpRequest()) + + expectWireData( + """GET / HTTP/1.1 + |Host: example.com + |User-Agent: akka-http/test + | + |""") + + sendWireData( + """HTTP/1.1 200 OK + |Transfer-Encoding: chunked + | + |6 + |abcdef + |0 + | + |""") + + val whenComplete = expectResponse().entity.dataBytes.runWith(Sink.ignore) + whenComplete.futureValue should be (akka.Done) + } + } + + "handle several requests on one persistent connection" which { "has a first response that was chunked" in new TestSetup { requestsSub.sendNext(HttpRequest()) diff --git a/akka-http-core/src/test/scala/akka/http/impl/engine/server/HttpServerSpec.scala b/akka-http-core/src/test/scala/akka/http/impl/engine/server/HttpServerSpec.scala index 720bf6a1dd..0fc5101a16 100644 --- a/akka-http-core/src/test/scala/akka/http/impl/engine/server/HttpServerSpec.scala +++ b/akka-http-core/src/test/scala/akka/http/impl/engine/server/HttpServerSpec.scala @@ -11,6 +11,7 @@ import scala.util.Random import scala.annotation.tailrec import scala.concurrent.duration._ import org.scalatest.Inside +import org.scalatest.concurrent.ScalaFutures import akka.util.ByteString import akka.stream.scaladsl._ import akka.stream.ActorMaterializer @@ -325,6 +326,53 @@ class HttpServerSpec extends AkkaSpec( } } + "close the connection if request entity stream has been cancelled" in new TestSetup { + // two chunks sent by client + send("""POST / HTTP/1.1 + |Host: example.com + |Transfer-Encoding: chunked + | + |6 + |abcdef + |6 + |abcdef + |0 + | + |""") + + inside(expectRequest()) { + case HttpRequest(POST, _, _, HttpEntity.Chunked(_, data), _) ⇒ + val dataProbe = TestSubscriber.manualProbe[ChunkStreamPart] + // but only one consumed by server + data.take(1).to(Sink.fromSubscriber(dataProbe)).run() + val sub = dataProbe.expectSubscription() + sub.request(1) + dataProbe.expectNext(Chunk(ByteString("abcdef"))) + dataProbe.expectComplete() + // connection closes once requested elements are consumed + netIn.expectCancellation() + } + } + + "proceed to next request once previous request's entity has beed drained" in new TestSetup with ScalaFutures { + def twice(action: => Unit): Unit = { action; action } + + twice { + send("""POST / HTTP/1.1 + |Host: example.com + |Transfer-Encoding: chunked + | + |6 + |abcdef + |0 + | + |""") + + val whenComplete = expectRequest().entity.dataBytes.runWith(Sink.ignore) + whenComplete.futureValue should be (akka.Done) + } + } + "report a truncated entity stream on the entity data stream and the main stream for a Default entity" in new TestSetup { send("""POST / HTTP/1.1 |Host: example.com