From 234aaadaa704b6eef9f142e1a35a4674c5555ba0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Endre=20S=C3=A1ndor=20Varga?= Date: Wed, 24 Jun 2015 16:15:01 +0200 Subject: [PATCH] Added test for cancellation scenarios in Http client/server --- .../client/ClientCancellationSpec.scala | 76 +++++++++++++++++++ 1 file changed, 76 insertions(+) create mode 100644 akka-http-core/src/test/scala/akka/http/impl/engine/client/ClientCancellationSpec.scala diff --git a/akka-http-core/src/test/scala/akka/http/impl/engine/client/ClientCancellationSpec.scala b/akka-http-core/src/test/scala/akka/http/impl/engine/client/ClientCancellationSpec.scala new file mode 100644 index 0000000000..52c7b40653 --- /dev/null +++ b/akka-http-core/src/test/scala/akka/http/impl/engine/client/ClientCancellationSpec.scala @@ -0,0 +1,76 @@ +package akka.http.impl.engine.client + +import javax.net.ssl.SSLContext + +import akka.http.scaladsl.{ HttpsContext, Http } +import akka.http.scaladsl.model.{ HttpHeader, HttpResponse, HttpRequest } +import akka.stream.ActorMaterializer +import akka.stream.scaladsl.{ Flow, Sink, Source } +import akka.stream.testkit.{ TestSubscriber, TestPublisher, AkkaSpec, TestUtils, Utils } +import akka.http.scaladsl.model.headers + +class ClientCancellationSpec extends AkkaSpec(""" + #akka.loggers = [] + akka.loglevel = DEBUG + #akka.io.tcp.trace-logging = off + akka.io.tcp.windows-connection-abort-workaround-enabled=auto""") { + + implicit val materializer = ActorMaterializer() + val noncheckedMaterializer = ActorMaterializer() + + "Http client connections" must { + val address = TestUtils.temporaryServerAddress() + Http().bindAndHandleSync( + { req ⇒ HttpResponse(headers = headers.Connection("close") :: Nil) }, + address.getHostName, + address.getPort)(noncheckedMaterializer) + + val addressTls = TestUtils.temporaryServerAddress() + Http().bindAndHandleSync( + { req ⇒ HttpResponse() }, // TLS client does full-close, no need for the connection:close header + addressTls.getHostName, + addressTls.getPort, + httpsContext = Some(HttpsContext(SSLContext.getDefault)))(noncheckedMaterializer) + + def testCase(connection: Flow[HttpRequest, HttpResponse, Any]): Unit = Utils.assertAllStagesStopped { + val requests = TestPublisher.probe[HttpRequest]() + val responses = TestSubscriber.probe[HttpResponse]() + Source(requests).via(connection).runWith(Sink(responses)) + responses.request(1) + requests.sendNext(HttpRequest()) + responses.expectNext().entity.dataBytes.runWith(Sink.cancelled) + responses.cancel() + requests.expectCancellation() + } + + "support cancellation in simple outgoing connection" in { + testCase( + Http().outgoingConnection(address.getHostName, address.getPort)) + } + + "support cancellation in pooled outgoing connection" in { + testCase( + Flow[HttpRequest] + .map((_, ())) + .via(Http().cachedHostConnectionPool(address.getHostName, address.getPort)(noncheckedMaterializer)) + .map(_._1.get)) + } + + "support cancellation in simple outgoing connection with TLS" in { + pending + testCase( + Http().outgoingConnectionTls(addressTls.getHostName, addressTls.getPort)) + } + + "support cancellation in pooled outgoing connection with TLS" in { + pending + testCase( + Flow[HttpRequest] + .map((_, ())) + .via(Http().cachedHostConnectionPoolTls(addressTls.getHostName, addressTls.getPort)(noncheckedMaterializer)) + .map(_._1.get)) + } + + } + +}