diff --git a/akka-http-core/src/test/scala/akka/http/engine/client/HighLevelOutgoingConnectionSpec.scala b/akka-http-core/src/test/scala/akka/http/engine/client/HighLevelOutgoingConnectionSpec.scala new file mode 100644 index 0000000000..ef2877c024 --- /dev/null +++ b/akka-http-core/src/test/scala/akka/http/engine/client/HighLevelOutgoingConnectionSpec.scala @@ -0,0 +1,71 @@ +/* + * Copyright (C) 2009-2014 Typesafe Inc. + */ + +package akka.http.engine.client + +import scala.concurrent.Await +import scala.concurrent.duration._ +import akka.http.{ Http, TestUtils } +import akka.http.model.HttpEntity._ +import akka.http.model._ +import akka.stream.ActorFlowMaterializer +import akka.stream.scaladsl._ +import akka.stream.testkit.AkkaSpec + +class HighLevelOutgoingConnectionSpec extends AkkaSpec("akka.loggers = []\n akka.loglevel = OFF") { + implicit val materializer = ActorFlowMaterializer() + + "The connection-level client implementation" should { + + "be able to handle 100 pipelined requests across one connection" in { + val (serverHostName, serverPort) = TestUtils.temporaryServerHostnameAndPort() + + Http().bindAndHandleSync(r ⇒ HttpResponse(entity = r.uri.toString.reverse.takeWhile(Character.isDigit).reverse), + serverHostName, serverPort) + + val N = 100 + val result = Source(() ⇒ Iterator.from(1)) + .take(N) + .map(id ⇒ HttpRequest(uri = s"/r$id")) + .via(Http().outgoingConnection(serverHostName, serverPort)) + .mapAsync(4, _.entity.toStrict(1.second)) + .map { r ⇒ val s = r.data.utf8String; log.debug(s); s.toInt } + .runFold(0)(_ + _) + + Await.result(result, 10.seconds) shouldEqual N * (N + 1) / 2 + } + + "be able to handle 100 pipelined requests across 4 connections (client-flow is reusable)" in { + val (serverHostName, serverPort) = TestUtils.temporaryServerHostnameAndPort() + + Http().bindAndHandleSync(r ⇒ HttpResponse(entity = r.uri.toString.reverse.takeWhile(Character.isDigit).reverse), + serverHostName, serverPort) + + val connFlow = Http().outgoingConnection(serverHostName, serverPort) + + val C = 4 + val doubleConnection = Flow() { implicit b ⇒ + import FlowGraph.Implicits._ + + val bcast = b.add(Broadcast[HttpRequest](C)) + val merge = b.add(Merge[HttpResponse](C)) + + for (i ← 0 until C) + bcast.out(i) ~> connFlow ~> merge.in(i) + (bcast.in, merge.out) + } + + val N = 100 + val result = Source(() ⇒ Iterator.from(1)) + .take(N) + .map(id ⇒ HttpRequest(uri = s"/r$id")) + .via(doubleConnection) + .mapAsync(4, _.entity.toStrict(1.second)) + .map { r ⇒ val s = r.data.utf8String; log.debug(s); s.toInt } + .runFold(0)(_ + _) + + Await.result(result, 10.seconds) shouldEqual C * N * (N + 1) / 2 + } + } +} \ No newline at end of file diff --git a/akka-http-core/src/test/scala/akka/http/engine/client/OutgoingConnectionSpec.scala b/akka-http-core/src/test/scala/akka/http/engine/client/LowLevelOutgoingConnectionSpec.scala similarity index 98% rename from akka-http-core/src/test/scala/akka/http/engine/client/OutgoingConnectionSpec.scala rename to akka-http-core/src/test/scala/akka/http/engine/client/LowLevelOutgoingConnectionSpec.scala index 059b00d065..9d79cf318b 100644 --- a/akka-http-core/src/test/scala/akka/http/engine/client/OutgoingConnectionSpec.scala +++ b/akka-http-core/src/test/scala/akka/http/engine/client/LowLevelOutgoingConnectionSpec.scala @@ -17,10 +17,10 @@ import akka.http.model._ import akka.http.model.headers._ import akka.http.util._ -class OutgoingConnectionSpec extends AkkaSpec("akka.loggers = []\n akka.loglevel = OFF") with Inside { +class LowLevelOutgoingConnectionSpec extends AkkaSpec("akka.loggers = []\n akka.loglevel = OFF") with Inside { implicit val materializer = ActorFlowMaterializer() - "The client implementation" should { + "The connection-level client implementation" should { "handle a request/response round-trip" which {