diff --git a/akka-http-tests/src/test/scala/akka/http/server/TestServer.scala b/akka-http-tests/src/test/scala/akka/http/server/TestServer.scala index 2ff749c93c..cefab97395 100644 --- a/akka-http-tests/src/test/scala/akka/http/server/TestServer.scala +++ b/akka-http-tests/src/test/scala/akka/http/server/TestServer.scala @@ -17,16 +17,13 @@ import akka.http.model._ object TestServer extends App { val testConf: Config = ConfigFactory.parseString(""" akka.loglevel = INFO - akka.log-dead-letters = off - """) + akka.log-dead-letters = off""") implicit val system = ActorSystem("ServerTest", testConf) import system.dispatcher implicit val materializer = FlowMaterializer() - implicit val askTimeout: Timeout = 500.millis - val serverSource = Http(system).bind(interface = "localhost", port = 8080) - import ScalaRoutingDSL._ + import ScalaXmlSupport._ def auth = HttpBasicAuthenticator.provideUserName { @@ -34,33 +31,31 @@ object TestServer extends App { case _ ⇒ false } - // FIXME: a simple `import ScalaXmlSupport._` should suffice but currently doesn't because - // of #16190 - implicit val html = ScalaXmlSupport.nodeSeqMarshaller(MediaTypes.`text/html`) + val binding = Http().bind(interface = "localhost", port = 8080) - handleConnections(serverSource) withRoute { - get { - path("") { - complete(index) - } ~ - path("secure") { - HttpBasicAuthentication("My very secure site")(auth) { user ⇒ - complete(Hello { user }. Access has been granted!) + val materializedMap = + handleConnections(binding) withRoute { + get { + path("") { + complete(index) + } ~ + path("secure") { + HttpBasicAuthentication("My very secure site")(auth) { user ⇒ + complete(Hello { user }. Access has been granted!) + } + } ~ + path("ping") { + complete("PONG!") + } ~ + path("crash") { + complete(sys.error("BOOM!")) } - } ~ - path("ping") { - complete("PONG!") - } ~ - path("crash") { - complete(sys.error("BOOM!")) - } + } } - } println(s"Server online at http://localhost:8080/\nPress RETURN to stop...") - Console.readLine() - system.shutdown() + binding.unbind(materializedMap).onComplete(_ ⇒ system.shutdown()) lazy val index = diff --git a/akka-http/src/main/scala/akka/http/server/RoutingSetup.scala b/akka-http/src/main/scala/akka/http/server/RoutingSetup.scala index 3e61358f4e..5d4f399272 100644 --- a/akka-http/src/main/scala/akka/http/server/RoutingSetup.scala +++ b/akka-http/src/main/scala/akka/http/server/RoutingSetup.scala @@ -4,13 +4,12 @@ package akka.http.server -import akka.stream.FlowMaterializer - import scala.concurrent.ExecutionContext -import akka.actor.{ ActorSystem, ActorContext } import akka.event.LoggingAdapter -import akka.http.model.HttpRequest +import akka.actor.{ ActorSystem, ActorContext } +import akka.stream.FlowMaterializer import akka.http.Http +import akka.http.model.HttpRequest /** * Provides a ``RoutingSetup`` for a given connection. diff --git a/akka-http/src/main/scala/akka/http/server/ScalaRoutingDSL.scala b/akka-http/src/main/scala/akka/http/server/ScalaRoutingDSL.scala index ebfdb5555a..639ae90aa2 100644 --- a/akka-http/src/main/scala/akka/http/server/ScalaRoutingDSL.scala +++ b/akka-http/src/main/scala/akka/http/server/ScalaRoutingDSL.scala @@ -19,33 +19,43 @@ import FastFuture._ */ trait ScalaRoutingDSL extends Directives { - sealed trait Applicator[R] { - def withRoute(route: Route): R - def withSyncHandler(handler: HttpRequest ⇒ HttpResponse): R - def withAsyncHandler(handler: HttpRequest ⇒ Future[HttpResponse]): R - } + /** + * Handles all connections from the given binding at maximum rate. + */ + def handleConnections(httpServerBinding: Http.ServerBinding)(implicit fm: FlowMaterializer, + setupProvider: RoutingSetupProvider): Applicator = + handleConnections(httpServerBinding.connections) - def handleConnections(serverSource: Http.ServerSource)(implicit fm: FlowMaterializer, - setupProvider: RoutingSetupProvider): Applicator[Unit] = { - new Applicator[Unit] { - def withRoute(route: Route): Unit = + /** + * Handles all connections from the given source at maximum rate. + */ + def handleConnections(connections: Source[Http.IncomingConnection])(implicit fm: FlowMaterializer, + setupProvider: RoutingSetupProvider): Applicator = + new Applicator { + def withRoute(route: Route) = run(routeRunner(route, _)) - def withSyncHandler(handler: HttpRequest ⇒ HttpResponse): Unit = + def withSyncHandler(handler: HttpRequest ⇒ HttpResponse) = withAsyncHandler(request ⇒ FastFuture.successful(handler(request))) - def withAsyncHandler(handler: HttpRequest ⇒ Future[HttpResponse]): Unit = + def withAsyncHandler(handler: HttpRequest ⇒ Future[HttpResponse]) = run(_ ⇒ handler) - private def run(f: RoutingSetup ⇒ HttpRequest ⇒ Future[HttpResponse]): Unit = - serverSource.source.foreach { - case connection @ Http.IncomingConnection(remoteAddress, flow) ⇒ - val setup = setupProvider(connection) - setup.routingLog.log.debug("Accepted new connection from " + remoteAddress) - val runner = f(setup) - flow.join(Flow[HttpRequest].mapAsync(request ⇒ runner(request))).run()(fm) + def run(f: RoutingSetup ⇒ HttpRequest ⇒ Future[HttpResponse]): MaterializedMap = { + val sink = ForeachSink[Http.IncomingConnection] { connection ⇒ + val setup = setupProvider(connection) + setup.routingLog.log.debug("Accepted new connection from " + connection.remoteAddress) + val runner = f(setup) + connection.handleWith(Flow[HttpRequest] mapAsync runner) } + connections.to(sink).run() + } } + + sealed trait Applicator { + def withRoute(route: Route): MaterializedMap + def withSyncHandler(handler: HttpRequest ⇒ HttpResponse): MaterializedMap + def withAsyncHandler(handler: HttpRequest ⇒ Future[HttpResponse]): MaterializedMap } def routeRunner(route: Route, setup: RoutingSetup): HttpRequest ⇒ Future[HttpResponse] = {