Merge pull request #18854 from ktoso/mwip-more-idleTimeout-client-tests-ktoso
=str #17732 covered all client APIs to check if idle timeouts work
This commit is contained in:
commit
42ff7d7a59
2 changed files with 138 additions and 66 deletions
|
|
@ -9,13 +9,14 @@ import java.net.{ BindException, Socket }
|
||||||
import java.util.concurrent.TimeoutException
|
import java.util.concurrent.TimeoutException
|
||||||
|
|
||||||
import akka.actor.ActorSystem
|
import akka.actor.ActorSystem
|
||||||
|
import akka.event.NoLogging
|
||||||
import akka.http.impl.util._
|
import akka.http.impl.util._
|
||||||
import akka.http.scaladsl.Http.ServerBinding
|
import akka.http.scaladsl.Http.ServerBinding
|
||||||
import akka.http.scaladsl.model.HttpEntity._
|
import akka.http.scaladsl.model.HttpEntity._
|
||||||
import akka.http.scaladsl.model.HttpMethods._
|
import akka.http.scaladsl.model.HttpMethods._
|
||||||
import akka.http.scaladsl.model._
|
import akka.http.scaladsl.model._
|
||||||
import akka.http.scaladsl.model.headers._
|
import akka.http.scaladsl.model.headers._
|
||||||
import akka.http.{ ClientConnectionSettings, ServerSettings }
|
import akka.http.{ ConnectionPoolSettings, ClientConnectionSettings, ServerSettings }
|
||||||
import akka.stream.scaladsl._
|
import akka.stream.scaladsl._
|
||||||
import akka.stream.testkit._
|
import akka.stream.testkit._
|
||||||
import akka.stream.{ ActorMaterializer, BindFailedException, StreamTcpException }
|
import akka.stream.{ ActorMaterializer, BindFailedException, StreamTcpException }
|
||||||
|
|
@ -27,7 +28,7 @@ import org.scalatest.{ BeforeAndAfterAll, Matchers, WordSpec }
|
||||||
import scala.annotation.tailrec
|
import scala.annotation.tailrec
|
||||||
import scala.concurrent.duration._
|
import scala.concurrent.duration._
|
||||||
import scala.concurrent.{ Await, Future, Promise }
|
import scala.concurrent.{ Await, Future, Promise }
|
||||||
import scala.util.Success
|
import scala.util.{ Try, Success }
|
||||||
|
|
||||||
class ClientServerSpec extends WordSpec with Matchers with BeforeAndAfterAll {
|
class ClientServerSpec extends WordSpec with Matchers with BeforeAndAfterAll {
|
||||||
val testConf: Config = ConfigFactory.parseString("""
|
val testConf: Config = ConfigFactory.parseString("""
|
||||||
|
|
@ -141,80 +142,151 @@ class ClientServerSpec extends WordSpec with Matchers with BeforeAndAfterAll {
|
||||||
Await.result(b1.unbind(), 1.second)
|
Await.result(b1.unbind(), 1.second)
|
||||||
}
|
}
|
||||||
|
|
||||||
"close connection with idle client after idleTimeout" in {
|
"timeouts" should {
|
||||||
val (_, hostname, port) = TestUtils.temporaryServerHostnameAndPort()
|
def bindServer(hostname: String, port: Int, serverTimeout: FiniteDuration): (Promise[Long], ServerBinding) = {
|
||||||
val s = ServerSettings(system)
|
val s = ServerSettings(system)
|
||||||
val theIdleTimeout = 300.millis
|
val settings = s.copy(timeouts = s.timeouts.copy(idleTimeout = serverTimeout))
|
||||||
val settings = s.copy(timeouts = s.timeouts.copy(idleTimeout = theIdleTimeout))
|
|
||||||
|
|
||||||
val receivedRequest = Promise[Long]()
|
val receivedRequest = Promise[Long]()
|
||||||
|
|
||||||
def handle(req: HttpRequest): Future[HttpResponse] = {
|
def handle(req: HttpRequest): Future[HttpResponse] = {
|
||||||
receivedRequest.complete(Success(System.nanoTime()))
|
receivedRequest.complete(Success(System.nanoTime()))
|
||||||
Promise().future // never complete the request with a response; we're waiting for the timeout to happen, nothing else
|
Promise().future // never complete the request with a response; we're waiting for the timeout to happen, nothing else
|
||||||
}
|
|
||||||
|
|
||||||
val binding = Http().bindAndHandleAsync(handle, hostname, port, settings = settings)
|
|
||||||
val b1 = Await.result(binding, 3.seconds)
|
|
||||||
|
|
||||||
try {
|
|
||||||
def runIdleRequest(uri: Uri): Future[HttpResponse] = {
|
|
||||||
val itNeverEnds = Chunked.fromData(ContentTypes.`text/plain`, Source.maybe[ByteString])
|
|
||||||
Http().outgoingConnection(hostname, port)
|
|
||||||
.runWith(Source.single(HttpRequest(PUT, uri, entity = itNeverEnds)), Sink.head)
|
|
||||||
._2
|
|
||||||
}
|
}
|
||||||
|
|
||||||
val clientsResponseFuture = runIdleRequest("/")
|
val binding = Http().bindAndHandleAsync(handle, hostname, port, settings = settings)
|
||||||
|
val b1 = Await.result(binding, 3.seconds)
|
||||||
// await for the server to get the request
|
(receivedRequest, b1)
|
||||||
val serverReceivedRequestAtNanos = Await.result(receivedRequest.future, 2.seconds)
|
|
||||||
|
|
||||||
// waiting for the timeout to happen on the client
|
|
||||||
intercept[StreamTcpException] { Await.result(clientsResponseFuture, 2.second) }
|
|
||||||
(System.nanoTime() - serverReceivedRequestAtNanos).millis should be >= theIdleTimeout
|
|
||||||
} finally Await.result(b1.unbind(), 1.second)
|
|
||||||
}
|
|
||||||
|
|
||||||
"close connection with idle server after idleTimeout" in {
|
|
||||||
val (_, hostname, port) = TestUtils.temporaryServerHostnameAndPort()
|
|
||||||
val cs = ClientConnectionSettings(system)
|
|
||||||
val clientTimeout = 345.millis
|
|
||||||
val clientSettings = cs.copy(idleTimeout = clientTimeout)
|
|
||||||
|
|
||||||
val s = ServerSettings(system)
|
|
||||||
val serverTimeout = 10.seconds
|
|
||||||
val serverSettings = s.copy(timeouts = s.timeouts.copy(idleTimeout = serverTimeout))
|
|
||||||
|
|
||||||
val receivedRequest = Promise[Long]()
|
|
||||||
|
|
||||||
def handle(req: HttpRequest): Future[HttpResponse] = {
|
|
||||||
receivedRequest.complete(Success(System.nanoTime()))
|
|
||||||
Promise().future // never complete the request with a response; we're waiting for the timeout to happen, nothing else
|
|
||||||
}
|
}
|
||||||
|
|
||||||
val binding = Http().bindAndHandleAsync(handle, hostname, port, settings = serverSettings)
|
"support server timeouts" should {
|
||||||
val b1 = Await.result(binding, 3.seconds)
|
"close connection with idle client after idleTimeout" in {
|
||||||
|
val serverTimeout = 300.millis
|
||||||
|
val (_, hostname, port) = TestUtils.temporaryServerHostnameAndPort()
|
||||||
|
val (receivedRequest: Promise[Long], b1: ServerBinding) = bindServer(hostname, port, serverTimeout)
|
||||||
|
|
||||||
try {
|
try {
|
||||||
def runRequest(uri: Uri): Future[HttpResponse] = {
|
def runIdleRequest(uri: Uri): Future[HttpResponse] = {
|
||||||
val itNeverEnds = Chunked.fromData(ContentTypes.`text/plain`, Source.maybe[ByteString])
|
val itNeverEnds = Chunked.fromData(ContentTypes.`text/plain`, Source.maybe[ByteString])
|
||||||
Http().outgoingConnection(hostname, port, settings = clientSettings)
|
Http().outgoingConnection(hostname, port)
|
||||||
.runWith(Source.single(HttpRequest(POST, uri, entity = itNeverEnds)), Sink.head)
|
.runWith(Source.single(HttpRequest(PUT, uri, entity = itNeverEnds)), Sink.head)
|
||||||
._2
|
._2
|
||||||
|
}
|
||||||
|
|
||||||
|
val clientsResponseFuture = runIdleRequest("/")
|
||||||
|
|
||||||
|
// await for the server to get the request
|
||||||
|
val serverReceivedRequestAtNanos = Await.result(receivedRequest.future, 2.seconds)
|
||||||
|
|
||||||
|
// waiting for the timeout to happen on the client
|
||||||
|
intercept[StreamTcpException] {
|
||||||
|
Await.result(clientsResponseFuture, 2.second)
|
||||||
|
}
|
||||||
|
(System.nanoTime() - serverReceivedRequestAtNanos).millis should be >= serverTimeout
|
||||||
|
} finally Await.result(b1.unbind(), 1.second)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
"support client timeouts" should {
|
||||||
|
"close connection with idle server after idleTimeout (using connection level client API)" in {
|
||||||
|
val serverTimeout = 10.seconds
|
||||||
|
|
||||||
|
val cs = ClientConnectionSettings(system)
|
||||||
|
val clientTimeout = 345.millis
|
||||||
|
val clientSettings = cs.copy(idleTimeout = clientTimeout)
|
||||||
|
|
||||||
|
val (_, hostname, port) = TestUtils.temporaryServerHostnameAndPort()
|
||||||
|
val (receivedRequest: Promise[Long], b1: ServerBinding) = bindServer(hostname, port, serverTimeout)
|
||||||
|
|
||||||
|
try {
|
||||||
|
def runRequest(uri: Uri): Future[HttpResponse] = {
|
||||||
|
val itNeverSends = Chunked.fromData(ContentTypes.`text/plain`, Source.maybe[ByteString])
|
||||||
|
Http().outgoingConnection(hostname, port, settings = clientSettings)
|
||||||
|
.runWith(Source.single(HttpRequest(POST, uri, entity = itNeverSends)), Sink.head)
|
||||||
|
._2
|
||||||
|
}
|
||||||
|
|
||||||
|
val clientsResponseFuture = runRequest("/")
|
||||||
|
|
||||||
|
// await for the server to get the request
|
||||||
|
val serverReceivedRequestAtNanos = Await.result(receivedRequest.future, 2.seconds)
|
||||||
|
|
||||||
|
// waiting for the timeout to happen on the client
|
||||||
|
intercept[TimeoutException] {
|
||||||
|
Await.result(clientsResponseFuture, 2.second)
|
||||||
|
}
|
||||||
|
val actualTimeout = System.nanoTime() - serverReceivedRequestAtNanos
|
||||||
|
actualTimeout.nanos should be >= clientTimeout
|
||||||
|
actualTimeout.nanos should be < serverTimeout
|
||||||
|
} finally Await.result(b1.unbind(), 1.second)
|
||||||
}
|
}
|
||||||
|
|
||||||
val clientsResponseFuture = runRequest("/")
|
"close connection with idle server after idleTimeout (using pool level client API)" in {
|
||||||
|
val serverTimeout = 10.seconds
|
||||||
|
|
||||||
// await for the server to get the request
|
val cs = ConnectionPoolSettings(system)
|
||||||
val serverReceivedRequestAtNanos = Await.result(receivedRequest.future, 2.seconds)
|
val clientTimeout = 345.millis
|
||||||
|
val clientPoolSettings = cs.copy(idleTimeout = clientTimeout)
|
||||||
|
|
||||||
// waiting for the timeout to happen on the client
|
val (_, hostname, port) = TestUtils.temporaryServerHostnameAndPort()
|
||||||
intercept[TimeoutException] { Await.result(clientsResponseFuture, 2.second) }
|
val (receivedRequest: Promise[Long], b1: ServerBinding) = bindServer(hostname, port, serverTimeout)
|
||||||
val actualTimeout = System.nanoTime() - serverReceivedRequestAtNanos
|
|
||||||
actualTimeout.nanos should be >= clientTimeout
|
try {
|
||||||
actualTimeout.nanos should be < serverTimeout
|
val pool = Http().cachedHostConnectionPool[Int](hostname, port, clientPoolSettings)
|
||||||
} finally Await.result(b1.unbind(), 1.second)
|
|
||||||
|
def runRequest(uri: Uri): Future[(Try[HttpResponse], Int)] = {
|
||||||
|
val itNeverSends = Chunked.fromData(ContentTypes.`text/plain`, Source.maybe[ByteString])
|
||||||
|
Source.single(HttpRequest(POST, uri, entity = itNeverSends) -> 1)
|
||||||
|
.via(pool)
|
||||||
|
.runWith(Sink.head)
|
||||||
|
}
|
||||||
|
|
||||||
|
val clientsResponseFuture = runRequest("/")
|
||||||
|
|
||||||
|
// await for the server to get the request
|
||||||
|
val serverReceivedRequestAtNanos = Await.result(receivedRequest.future, 2.seconds)
|
||||||
|
|
||||||
|
// waiting for the timeout to happen on the client
|
||||||
|
intercept[TimeoutException] {
|
||||||
|
Await.result(clientsResponseFuture, 2.second)
|
||||||
|
}
|
||||||
|
val actualTimeout = System.nanoTime() - serverReceivedRequestAtNanos
|
||||||
|
actualTimeout.nanos should be >= clientTimeout
|
||||||
|
actualTimeout.nanos should be < serverTimeout
|
||||||
|
} finally Await.result(b1.unbind(), 1.second)
|
||||||
|
}
|
||||||
|
|
||||||
|
"close connection with idle server after idleTimeout (using request level client API)" in {
|
||||||
|
val serverTimeout = 10.seconds
|
||||||
|
|
||||||
|
val cs = ConnectionPoolSettings(system)
|
||||||
|
val clientTimeout = 345.millis
|
||||||
|
val clientPoolSettings = cs.copy(idleTimeout = clientTimeout)
|
||||||
|
|
||||||
|
val (_, hostname, port) = TestUtils.temporaryServerHostnameAndPort()
|
||||||
|
val (receivedRequest: Promise[Long], b1: ServerBinding) = bindServer(hostname, port, serverTimeout)
|
||||||
|
|
||||||
|
try {
|
||||||
|
def runRequest(uri: Uri): Future[HttpResponse] = {
|
||||||
|
val itNeverSends = Chunked.fromData(ContentTypes.`text/plain`, Source.maybe[ByteString])
|
||||||
|
Http().singleRequest(HttpRequest(POST, uri, entity = itNeverSends), settings = clientPoolSettings)
|
||||||
|
}
|
||||||
|
|
||||||
|
val clientsResponseFuture = runRequest(s"http://$hostname:$port/")
|
||||||
|
|
||||||
|
// await for the server to get the request
|
||||||
|
val serverReceivedRequestAtNanos = Await.result(receivedRequest.future, 2.seconds)
|
||||||
|
|
||||||
|
// waiting for the timeout to happen on the client
|
||||||
|
intercept[TimeoutException] {
|
||||||
|
Await.result(clientsResponseFuture, 3.second)
|
||||||
|
}
|
||||||
|
val actualTimeout = System.nanoTime() - serverReceivedRequestAtNanos
|
||||||
|
actualTimeout.nanos should be >= clientTimeout
|
||||||
|
actualTimeout.nanos should be < serverTimeout
|
||||||
|
} finally Await.result(b1.unbind(), 1.second)
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
"log materialization errors in `bindAndHandle`" which {
|
"log materialization errors in `bindAndHandle`" which {
|
||||||
|
|
|
||||||
|
|
@ -619,7 +619,7 @@ private[stream] final class GraphInterpreter(
|
||||||
logic.afterPostStop()
|
logic.afterPostStop()
|
||||||
} catch {
|
} catch {
|
||||||
case NonFatal(e) ⇒
|
case NonFatal(e) ⇒
|
||||||
log.error(s"Error during postStop in [${assembly.stages(logic.stageId)}]", e)
|
log.error(e, s"Error during postStop in [${assembly.stages(logic.stageId)}]")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue