From 6efe61e64a0f190ce1ffc38ffb9c6afb0956893f Mon Sep 17 00:00:00 2001 From: Konrad Malawski Date: Sat, 31 Oct 2015 12:10:07 -0700 Subject: [PATCH] =htc #16597 more tests for outgoing connection timeout --- .../akka/http/scaladsl/ClientServerSpec.scala | 101 +++++++++++++----- 1 file changed, 72 insertions(+), 29 deletions(-) diff --git a/akka-http-core/src/test/scala/akka/http/scaladsl/ClientServerSpec.scala b/akka-http-core/src/test/scala/akka/http/scaladsl/ClientServerSpec.scala index 85929003fc..904cc3097e 100644 --- a/akka-http-core/src/test/scala/akka/http/scaladsl/ClientServerSpec.scala +++ b/akka-http-core/src/test/scala/akka/http/scaladsl/ClientServerSpec.scala @@ -5,27 +5,29 @@ package akka.http.scaladsl import java.io.{ BufferedReader, BufferedWriter, InputStreamReader, OutputStreamWriter } -import java.net.Socket -import akka.http.scaladsl.Http.ServerBinding -import akka.http.{ ClientConnectionSettings, ServerSettings } -import akka.util.ByteString -import com.typesafe.config.{ Config, ConfigFactory } -import scala.annotation.tailrec -import scala.concurrent.{ Promise, Future, Await } -import scala.concurrent.duration._ -import org.scalatest.{ BeforeAndAfterAll, Matchers, WordSpec } +import java.net.{ BindException, Socket } +import java.util.concurrent.TimeoutException + import akka.actor.ActorSystem -import akka.testkit.EventFilter -import akka.stream.{ StreamTcpException, ActorMaterializer, BindFailedException } -import akka.stream.scaladsl._ -import akka.stream.testkit._ +import akka.http.impl.util._ +import akka.http.scaladsl.Http.ServerBinding import akka.http.scaladsl.model.HttpEntity._ import akka.http.scaladsl.model.HttpMethods._ import akka.http.scaladsl.model._ import akka.http.scaladsl.model.headers._ -import akka.http.impl.util._ -import scala.util.{ Failure, Try, Success } -import java.net.BindException +import akka.http.{ ClientConnectionSettings, ServerSettings } +import akka.stream.scaladsl._ +import akka.stream.testkit._ +import akka.stream.{ ActorMaterializer, BindFailedException, StreamTcpException } +import akka.testkit.EventFilter +import akka.util.ByteString +import com.typesafe.config.{ Config, ConfigFactory } +import org.scalatest.{ BeforeAndAfterAll, Matchers, WordSpec } + +import scala.annotation.tailrec +import scala.concurrent.duration._ +import scala.concurrent.{ Await, Future, Promise } +import scala.util.Success class ClientServerSpec extends WordSpec with Matchers with BeforeAndAfterAll { val testConf: Config = ConfigFactory.parseString(""" @@ -149,29 +151,70 @@ class ClientServerSpec extends WordSpec with Matchers with BeforeAndAfterAll { def handle(req: HttpRequest): Future[HttpResponse] = { receivedRequest.complete(Success(System.nanoTime())) - Promise().future // never complete the request with a response; 're waiting for the timeout to happen, nothing else + Promise().future // never complete the request with a response; we're waiting for the timeout to happen, nothing else } val binding = Http().bindAndHandleAsync(handle, hostname, port, settings = settings) val b1 = Await.result(binding, 3.seconds) - def runIdleRequest(uri: Uri): Future[HttpResponse] = { - val itNeverEnds = Chunked.fromData(ContentTypes.`text/plain`, Source.maybe[ByteString]) - Http().outgoingConnection(hostname, port) - .runWith(Source.single(HttpRequest(PUT, uri, entity = itNeverEnds)), Sink.head) - ._2 + try { + def runIdleRequest(uri: Uri): Future[HttpResponse] = { + val itNeverEnds = Chunked.fromData(ContentTypes.`text/plain`, Source.maybe[ByteString]) + Http().outgoingConnection(hostname, port) + .runWith(Source.single(HttpRequest(PUT, uri, entity = itNeverEnds)), Sink.head) + ._2 + } + + val clientsResponseFuture = runIdleRequest("/") + + // await for the server to get the request + val serverReceivedRequestAtNanos = Await.result(receivedRequest.future, 2.seconds) + + // waiting for the timeout to happen on the client + intercept[StreamTcpException] { Await.result(clientsResponseFuture, 2.second) } + (System.nanoTime() - serverReceivedRequestAtNanos).millis should be >= theIdleTimeout + } finally Await.result(b1.unbind(), 1.second) + } + + "close connection with idle server after idleTimeout" in { + val (_, hostname, port) = TestUtils.temporaryServerHostnameAndPort() + val cs = ClientConnectionSettings(system) + val clientTimeout = 345.millis + val clientSettings = cs.copy(idleTimeout = clientTimeout) + + val s = ServerSettings(system) + val serverTimeout = 10.seconds + val serverSettings = s.copy(timeouts = s.timeouts.copy(idleTimeout = serverTimeout)) + + val receivedRequest = Promise[Long]() + + def handle(req: HttpRequest): Future[HttpResponse] = { + receivedRequest.complete(Success(System.nanoTime())) + Promise().future // never complete the request with a response; we're waiting for the timeout to happen, nothing else } - val clientsResponseFuture = runIdleRequest("/") + val binding = Http().bindAndHandleAsync(handle, hostname, port, settings = serverSettings) + val b1 = Await.result(binding, 3.seconds) - // await for the server to get the request - val serverReceivedRequestAtNanos = Await.result(receivedRequest.future, 2.seconds) + try { + def runRequest(uri: Uri): Future[HttpResponse] = { + val itNeverEnds = Chunked.fromData(ContentTypes.`text/plain`, Source.maybe[ByteString]) + Http().outgoingConnection(hostname, port, settings = clientSettings) + .runWith(Source.single(HttpRequest(POST, uri, entity = itNeverEnds)), Sink.head) + ._2 + } - // waiting for the timeout to happen on the client - intercept[StreamTcpException] { Await.result(clientsResponseFuture, 2.second) } + val clientsResponseFuture = runRequest("/") - val fudge = 100.millis - ((System.nanoTime() - serverReceivedRequestAtNanos).nanos + fudge) should be >= theIdleTimeout + // await for the server to get the request + val serverReceivedRequestAtNanos = Await.result(receivedRequest.future, 2.seconds) + + // waiting for the timeout to happen on the client + intercept[TimeoutException] { Await.result(clientsResponseFuture, 2.second) } + val actualTimeout = System.nanoTime() - serverReceivedRequestAtNanos + actualTimeout.nanos should be >= clientTimeout + actualTimeout.nanos should be < serverTimeout + } finally Await.result(b1.unbind(), 1.second) } "log materialization errors in `bindAndHandle`" which {