=htc #16597 more tests for outgoing connection timeout

This commit is contained in:
Konrad Malawski 2015-10-31 12:10:07 -07:00
parent 06ce968b16
commit 6efe61e64a

View file

@ -5,27 +5,29 @@
package akka.http.scaladsl package akka.http.scaladsl
import java.io.{ BufferedReader, BufferedWriter, InputStreamReader, OutputStreamWriter } import java.io.{ BufferedReader, BufferedWriter, InputStreamReader, OutputStreamWriter }
import java.net.Socket import java.net.{ BindException, Socket }
import akka.http.scaladsl.Http.ServerBinding import java.util.concurrent.TimeoutException
import akka.http.{ ClientConnectionSettings, ServerSettings }
import akka.util.ByteString
import com.typesafe.config.{ Config, ConfigFactory }
import scala.annotation.tailrec
import scala.concurrent.{ Promise, Future, Await }
import scala.concurrent.duration._
import org.scalatest.{ BeforeAndAfterAll, Matchers, WordSpec }
import akka.actor.ActorSystem import akka.actor.ActorSystem
import akka.testkit.EventFilter import akka.http.impl.util._
import akka.stream.{ StreamTcpException, ActorMaterializer, BindFailedException } import akka.http.scaladsl.Http.ServerBinding
import akka.stream.scaladsl._
import akka.stream.testkit._
import akka.http.scaladsl.model.HttpEntity._ import akka.http.scaladsl.model.HttpEntity._
import akka.http.scaladsl.model.HttpMethods._ import akka.http.scaladsl.model.HttpMethods._
import akka.http.scaladsl.model._ import akka.http.scaladsl.model._
import akka.http.scaladsl.model.headers._ import akka.http.scaladsl.model.headers._
import akka.http.impl.util._ import akka.http.{ ClientConnectionSettings, ServerSettings }
import scala.util.{ Failure, Try, Success } import akka.stream.scaladsl._
import java.net.BindException import akka.stream.testkit._
import akka.stream.{ ActorMaterializer, BindFailedException, StreamTcpException }
import akka.testkit.EventFilter
import akka.util.ByteString
import com.typesafe.config.{ Config, ConfigFactory }
import org.scalatest.{ BeforeAndAfterAll, Matchers, WordSpec }
import scala.annotation.tailrec
import scala.concurrent.duration._
import scala.concurrent.{ Await, Future, Promise }
import scala.util.Success
class ClientServerSpec extends WordSpec with Matchers with BeforeAndAfterAll { class ClientServerSpec extends WordSpec with Matchers with BeforeAndAfterAll {
val testConf: Config = ConfigFactory.parseString(""" val testConf: Config = ConfigFactory.parseString("""
@ -149,29 +151,70 @@ class ClientServerSpec extends WordSpec with Matchers with BeforeAndAfterAll {
def handle(req: HttpRequest): Future[HttpResponse] = { def handle(req: HttpRequest): Future[HttpResponse] = {
receivedRequest.complete(Success(System.nanoTime())) receivedRequest.complete(Success(System.nanoTime()))
Promise().future // never complete the request with a response; 're waiting for the timeout to happen, nothing else Promise().future // never complete the request with a response; we're waiting for the timeout to happen, nothing else
} }
val binding = Http().bindAndHandleAsync(handle, hostname, port, settings = settings) val binding = Http().bindAndHandleAsync(handle, hostname, port, settings = settings)
val b1 = Await.result(binding, 3.seconds) val b1 = Await.result(binding, 3.seconds)
def runIdleRequest(uri: Uri): Future[HttpResponse] = { try {
val itNeverEnds = Chunked.fromData(ContentTypes.`text/plain`, Source.maybe[ByteString]) def runIdleRequest(uri: Uri): Future[HttpResponse] = {
Http().outgoingConnection(hostname, port) val itNeverEnds = Chunked.fromData(ContentTypes.`text/plain`, Source.maybe[ByteString])
.runWith(Source.single(HttpRequest(PUT, uri, entity = itNeverEnds)), Sink.head) Http().outgoingConnection(hostname, port)
._2 .runWith(Source.single(HttpRequest(PUT, uri, entity = itNeverEnds)), Sink.head)
._2
}
val clientsResponseFuture = runIdleRequest("/")
// await for the server to get the request
val serverReceivedRequestAtNanos = Await.result(receivedRequest.future, 2.seconds)
// waiting for the timeout to happen on the client
intercept[StreamTcpException] { Await.result(clientsResponseFuture, 2.second) }
(System.nanoTime() - serverReceivedRequestAtNanos).millis should be >= theIdleTimeout
} finally Await.result(b1.unbind(), 1.second)
}
"close connection with idle server after idleTimeout" in {
val (_, hostname, port) = TestUtils.temporaryServerHostnameAndPort()
val cs = ClientConnectionSettings(system)
val clientTimeout = 345.millis
val clientSettings = cs.copy(idleTimeout = clientTimeout)
val s = ServerSettings(system)
val serverTimeout = 10.seconds
val serverSettings = s.copy(timeouts = s.timeouts.copy(idleTimeout = serverTimeout))
val receivedRequest = Promise[Long]()
def handle(req: HttpRequest): Future[HttpResponse] = {
receivedRequest.complete(Success(System.nanoTime()))
Promise().future // never complete the request with a response; we're waiting for the timeout to happen, nothing else
} }
val clientsResponseFuture = runIdleRequest("/") val binding = Http().bindAndHandleAsync(handle, hostname, port, settings = serverSettings)
val b1 = Await.result(binding, 3.seconds)
// await for the server to get the request try {
val serverReceivedRequestAtNanos = Await.result(receivedRequest.future, 2.seconds) def runRequest(uri: Uri): Future[HttpResponse] = {
val itNeverEnds = Chunked.fromData(ContentTypes.`text/plain`, Source.maybe[ByteString])
Http().outgoingConnection(hostname, port, settings = clientSettings)
.runWith(Source.single(HttpRequest(POST, uri, entity = itNeverEnds)), Sink.head)
._2
}
// waiting for the timeout to happen on the client val clientsResponseFuture = runRequest("/")
intercept[StreamTcpException] { Await.result(clientsResponseFuture, 2.second) }
val fudge = 100.millis // await for the server to get the request
((System.nanoTime() - serverReceivedRequestAtNanos).nanos + fudge) should be >= theIdleTimeout val serverReceivedRequestAtNanos = Await.result(receivedRequest.future, 2.seconds)
// waiting for the timeout to happen on the client
intercept[TimeoutException] { Await.result(clientsResponseFuture, 2.second) }
val actualTimeout = System.nanoTime() - serverReceivedRequestAtNanos
actualTimeout.nanos should be >= clientTimeout
actualTimeout.nanos should be < serverTimeout
} finally Await.result(b1.unbind(), 1.second)
} }
"log materialization errors in `bindAndHandle`" which { "log materialization errors in `bindAndHandle`" which {