diff --git a/akka-http-core/src/test/scala/akka/http/scaladsl/ClientServerSpec.scala b/akka-http-core/src/test/scala/akka/http/scaladsl/ClientServerSpec.scala index a4b93dcbc0..80f5be196e 100644 --- a/akka-http-core/src/test/scala/akka/http/scaladsl/ClientServerSpec.scala +++ b/akka-http-core/src/test/scala/akka/http/scaladsl/ClientServerSpec.scala @@ -9,13 +9,14 @@ import java.net.{ BindException, Socket } import java.util.concurrent.TimeoutException import akka.actor.ActorSystem +import akka.event.NoLogging import akka.http.impl.util._ import akka.http.scaladsl.Http.ServerBinding import akka.http.scaladsl.model.HttpEntity._ import akka.http.scaladsl.model.HttpMethods._ import akka.http.scaladsl.model._ import akka.http.scaladsl.model.headers._ -import akka.http.{ ClientConnectionSettings, ServerSettings } +import akka.http.{ ConnectionPoolSettings, ClientConnectionSettings, ServerSettings } import akka.stream.scaladsl._ import akka.stream.testkit._ import akka.stream.{ ActorMaterializer, BindFailedException, StreamTcpException } @@ -27,7 +28,7 @@ import org.scalatest.{ BeforeAndAfterAll, Matchers, WordSpec } import scala.annotation.tailrec import scala.concurrent.duration._ import scala.concurrent.{ Await, Future, Promise } -import scala.util.Success +import scala.util.{ Try, Success } class ClientServerSpec extends WordSpec with Matchers with BeforeAndAfterAll { val testConf: Config = ConfigFactory.parseString(""" @@ -141,80 +142,151 @@ class ClientServerSpec extends WordSpec with Matchers with BeforeAndAfterAll { Await.result(b1.unbind(), 1.second) } - "close connection with idle client after idleTimeout" in { - val (_, hostname, port) = TestUtils.temporaryServerHostnameAndPort() - val s = ServerSettings(system) - val theIdleTimeout = 300.millis - val settings = s.copy(timeouts = s.timeouts.copy(idleTimeout = theIdleTimeout)) + "timeouts" should { + def bindServer(hostname: String, port: Int, serverTimeout: FiniteDuration): (Promise[Long], ServerBinding) = { + val s = ServerSettings(system) + val settings = s.copy(timeouts = s.timeouts.copy(idleTimeout = serverTimeout)) - val receivedRequest = Promise[Long]() + val receivedRequest = Promise[Long]() - def handle(req: HttpRequest): Future[HttpResponse] = { - receivedRequest.complete(Success(System.nanoTime())) - Promise().future // never complete the request with a response; we're waiting for the timeout to happen, nothing else - } - - val binding = Http().bindAndHandleAsync(handle, hostname, port, settings = settings) - val b1 = Await.result(binding, 3.seconds) - - try { - def runIdleRequest(uri: Uri): Future[HttpResponse] = { - val itNeverEnds = Chunked.fromData(ContentTypes.`text/plain`, Source.maybe[ByteString]) - Http().outgoingConnection(hostname, port) - .runWith(Source.single(HttpRequest(PUT, uri, entity = itNeverEnds)), Sink.head) - ._2 + def handle(req: HttpRequest): Future[HttpResponse] = { + receivedRequest.complete(Success(System.nanoTime())) + Promise().future // never complete the request with a response; we're waiting for the timeout to happen, nothing else } - val clientsResponseFuture = runIdleRequest("/") - - // await for the server to get the request - val serverReceivedRequestAtNanos = Await.result(receivedRequest.future, 2.seconds) - - // waiting for the timeout to happen on the client - intercept[StreamTcpException] { Await.result(clientsResponseFuture, 2.second) } - (System.nanoTime() - serverReceivedRequestAtNanos).millis should be >= theIdleTimeout - } finally Await.result(b1.unbind(), 1.second) - } - - "close connection with idle server after idleTimeout" in { - val (_, hostname, port) = TestUtils.temporaryServerHostnameAndPort() - val cs = ClientConnectionSettings(system) - val clientTimeout = 345.millis - val clientSettings = cs.copy(idleTimeout = clientTimeout) - - val s = ServerSettings(system) - val serverTimeout = 10.seconds - val serverSettings = s.copy(timeouts = s.timeouts.copy(idleTimeout = serverTimeout)) - - val receivedRequest = Promise[Long]() - - def handle(req: HttpRequest): Future[HttpResponse] = { - receivedRequest.complete(Success(System.nanoTime())) - Promise().future // never complete the request with a response; we're waiting for the timeout to happen, nothing else + val binding = Http().bindAndHandleAsync(handle, hostname, port, settings = settings) + val b1 = Await.result(binding, 3.seconds) + (receivedRequest, b1) } - val binding = Http().bindAndHandleAsync(handle, hostname, port, settings = serverSettings) - val b1 = Await.result(binding, 3.seconds) + "support server timeouts" should { + "close connection with idle client after idleTimeout" in { + val serverTimeout = 300.millis + val (_, hostname, port) = TestUtils.temporaryServerHostnameAndPort() + val (receivedRequest: Promise[Long], b1: ServerBinding) = bindServer(hostname, port, serverTimeout) - try { - def runRequest(uri: Uri): Future[HttpResponse] = { - val itNeverEnds = Chunked.fromData(ContentTypes.`text/plain`, Source.maybe[ByteString]) - Http().outgoingConnection(hostname, port, settings = clientSettings) - .runWith(Source.single(HttpRequest(POST, uri, entity = itNeverEnds)), Sink.head) - ._2 + try { + def runIdleRequest(uri: Uri): Future[HttpResponse] = { + val itNeverEnds = Chunked.fromData(ContentTypes.`text/plain`, Source.maybe[ByteString]) + Http().outgoingConnection(hostname, port) + .runWith(Source.single(HttpRequest(PUT, uri, entity = itNeverEnds)), Sink.head) + ._2 + } + + val clientsResponseFuture = runIdleRequest("/") + + // await for the server to get the request + val serverReceivedRequestAtNanos = Await.result(receivedRequest.future, 2.seconds) + + // waiting for the timeout to happen on the client + intercept[StreamTcpException] { + Await.result(clientsResponseFuture, 2.second) + } + (System.nanoTime() - serverReceivedRequestAtNanos).millis should be >= serverTimeout + } finally Await.result(b1.unbind(), 1.second) + } + } + + "support client timeouts" should { + "close connection with idle server after idleTimeout (using connection level client API)" in { + val serverTimeout = 10.seconds + + val cs = ClientConnectionSettings(system) + val clientTimeout = 345.millis + val clientSettings = cs.copy(idleTimeout = clientTimeout) + + val (_, hostname, port) = TestUtils.temporaryServerHostnameAndPort() + val (receivedRequest: Promise[Long], b1: ServerBinding) = bindServer(hostname, port, serverTimeout) + + try { + def runRequest(uri: Uri): Future[HttpResponse] = { + val itNeverSends = Chunked.fromData(ContentTypes.`text/plain`, Source.maybe[ByteString]) + Http().outgoingConnection(hostname, port, settings = clientSettings) + .runWith(Source.single(HttpRequest(POST, uri, entity = itNeverSends)), Sink.head) + ._2 + } + + val clientsResponseFuture = runRequest("/") + + // await for the server to get the request + val serverReceivedRequestAtNanos = Await.result(receivedRequest.future, 2.seconds) + + // waiting for the timeout to happen on the client + intercept[TimeoutException] { + Await.result(clientsResponseFuture, 2.second) + } + val actualTimeout = System.nanoTime() - serverReceivedRequestAtNanos + actualTimeout.nanos should be >= clientTimeout + actualTimeout.nanos should be < serverTimeout + } finally Await.result(b1.unbind(), 1.second) } - val clientsResponseFuture = runRequest("/") + "close connection with idle server after idleTimeout (using pool level client API)" in { + val serverTimeout = 10.seconds - // await for the server to get the request - val serverReceivedRequestAtNanos = Await.result(receivedRequest.future, 2.seconds) + val cs = ConnectionPoolSettings(system) + val clientTimeout = 345.millis + val clientPoolSettings = cs.copy(idleTimeout = clientTimeout) - // waiting for the timeout to happen on the client - intercept[TimeoutException] { Await.result(clientsResponseFuture, 2.second) } - val actualTimeout = System.nanoTime() - serverReceivedRequestAtNanos - actualTimeout.nanos should be >= clientTimeout - actualTimeout.nanos should be < serverTimeout - } finally Await.result(b1.unbind(), 1.second) + val (_, hostname, port) = TestUtils.temporaryServerHostnameAndPort() + val (receivedRequest: Promise[Long], b1: ServerBinding) = bindServer(hostname, port, serverTimeout) + + try { + val pool = Http().cachedHostConnectionPool[Int](hostname, port, clientPoolSettings) + + def runRequest(uri: Uri): Future[(Try[HttpResponse], Int)] = { + val itNeverSends = Chunked.fromData(ContentTypes.`text/plain`, Source.maybe[ByteString]) + Source.single(HttpRequest(POST, uri, entity = itNeverSends) -> 1) + .via(pool) + .runWith(Sink.head) + } + + val clientsResponseFuture = runRequest("/") + + // await for the server to get the request + val serverReceivedRequestAtNanos = Await.result(receivedRequest.future, 2.seconds) + + // waiting for the timeout to happen on the client + intercept[TimeoutException] { + Await.result(clientsResponseFuture, 2.second) + } + val actualTimeout = System.nanoTime() - serverReceivedRequestAtNanos + actualTimeout.nanos should be >= clientTimeout + actualTimeout.nanos should be < serverTimeout + } finally Await.result(b1.unbind(), 1.second) + } + + "close connection with idle server after idleTimeout (using request level client API)" in { + val serverTimeout = 10.seconds + + val cs = ConnectionPoolSettings(system) + val clientTimeout = 345.millis + val clientPoolSettings = cs.copy(idleTimeout = clientTimeout) + + val (_, hostname, port) = TestUtils.temporaryServerHostnameAndPort() + val (receivedRequest: Promise[Long], b1: ServerBinding) = bindServer(hostname, port, serverTimeout) + + try { + def runRequest(uri: Uri): Future[HttpResponse] = { + val itNeverSends = Chunked.fromData(ContentTypes.`text/plain`, Source.maybe[ByteString]) + Http().singleRequest(HttpRequest(POST, uri, entity = itNeverSends), settings = clientPoolSettings) + } + + val clientsResponseFuture = runRequest(s"http://$hostname:$port/") + + // await for the server to get the request + val serverReceivedRequestAtNanos = Await.result(receivedRequest.future, 2.seconds) + + // waiting for the timeout to happen on the client + intercept[TimeoutException] { + Await.result(clientsResponseFuture, 3.second) + } + val actualTimeout = System.nanoTime() - serverReceivedRequestAtNanos + actualTimeout.nanos should be >= clientTimeout + actualTimeout.nanos should be < serverTimeout + } finally Await.result(b1.unbind(), 1.second) + } + } } "log materialization errors in `bindAndHandle`" which { diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphInterpreter.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphInterpreter.scala index 6dae4b87c5..f047130324 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphInterpreter.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphInterpreter.scala @@ -619,7 +619,7 @@ private[stream] final class GraphInterpreter( logic.afterPostStop() } catch { case NonFatal(e) ⇒ - log.error(s"Error during postStop in [${assembly.stages(logic.stageId)}]", e) + log.error(e, s"Error during postStop in [${assembly.stages(logic.stageId)}]") } }