diff --git a/akka-http-core/src/main/scala/akka/http/Http.scala b/akka-http-core/src/main/scala/akka/http/Http.scala index 3b32869d01..cdd6770272 100644 --- a/akka-http-core/src/main/scala/akka/http/Http.scala +++ b/akka-http-core/src/main/scala/akka/http/Http.scala @@ -60,13 +60,13 @@ class HttpExt(config: Config)(implicit system: ActorSystem) extends akka.actor.E * connections are being accepted at maximum rate, which, depending on the applications, might * present a DoS risk! */ - def bindAndstartHandlingWith(handler: Flow[HttpRequest, HttpResponse, _], + def bindAndStartHandlingWith(handler: Flow[HttpRequest, HttpResponse, _], interface: String, port: Int = 80, backlog: Int = 100, options: immutable.Traversable[Inet.SocketOption] = Nil, settings: Option[ServerSettings] = None, log: LoggingAdapter = system.log)(implicit fm: FlowMaterializer): Future[ServerBinding] = { bind(interface, port, backlog, options, settings, log).toMat(Sink.foreach { conn ⇒ - conn.flow.join(handler) + conn.flow.join(handler).run() })(Keep.left).run() } @@ -82,7 +82,7 @@ class HttpExt(config: Config)(implicit system: ActorSystem) extends akka.actor.E options: immutable.Traversable[Inet.SocketOption] = Nil, settings: Option[ServerSettings] = None, log: LoggingAdapter = system.log)(implicit fm: FlowMaterializer): Future[ServerBinding] = - bindAndstartHandlingWith(Flow[HttpRequest].map(handler), interface, port, backlog, options, settings, log) + bindAndStartHandlingWith(Flow[HttpRequest].map(handler), interface, port, backlog, options, settings, log) /** * Materializes the `connections` [[Source]] and handles all connections with the given flow. @@ -96,7 +96,7 @@ class HttpExt(config: Config)(implicit system: ActorSystem) extends akka.actor.E options: immutable.Traversable[Inet.SocketOption] = Nil, settings: Option[ServerSettings] = None, log: LoggingAdapter = system.log)(implicit fm: FlowMaterializer): Future[ServerBinding] = - bindAndstartHandlingWith(Flow[HttpRequest].mapAsync(handler), interface, port, backlog, options, settings, log) + bindAndStartHandlingWith(Flow[HttpRequest].mapAsync(handler), interface, port, backlog, options, settings, log) /** * Transforms a given HTTP-level server [[Flow]] into a lower-level TCP transport flow. diff --git a/akka-http-core/src/test/scala/akka/http/ClientServerSpec.scala b/akka-http-core/src/test/scala/akka/http/ClientServerSpec.scala index 01b13ddcd7..08efbb0fcd 100644 --- a/akka-http-core/src/test/scala/akka/http/ClientServerSpec.scala +++ b/akka-http-core/src/test/scala/akka/http/ClientServerSpec.scala @@ -6,32 +6,32 @@ package akka.http import java.io.{ BufferedReader, BufferedWriter, InputStreamReader, OutputStreamWriter } import java.net.Socket + +import akka.actor.ActorSystem +import akka.http.TestUtils._ +import akka.http.engine.client.ClientConnectionSettings +import akka.http.engine.server.ServerSettings +import akka.http.model.HttpEntity._ +import akka.http.model.HttpMethods._ +import akka.http.model._ +import akka.http.model.headers._ +import akka.http.util._ +import akka.stream.{ ActorFlowMaterializer, BindFailedException } +import akka.stream.scaladsl._ +import akka.stream.testkit.StreamTestKit +import akka.stream.testkit.StreamTestKit.{ PublisherProbe, SubscriberProbe } import com.typesafe.config.{ Config, ConfigFactory } +import org.scalatest.{ BeforeAndAfterAll, Matchers, WordSpec } + import scala.annotation.tailrec import scala.concurrent.Await import scala.concurrent.duration._ -import org.scalatest.{ BeforeAndAfterAll, Matchers, WordSpec } -import akka.actor.ActorSystem -import akka.stream.scaladsl._ -import akka.stream.BindFailedException -import akka.stream.ActorFlowMaterializer -import akka.stream.testkit.StreamTestKit -import akka.stream.testkit.StreamTestKit.{ PublisherProbe, SubscriberProbe } -import akka.http.engine.client.ClientConnectionSettings -import akka.http.engine.server.ServerSettings -import akka.http.model._ -import akka.http.util._ -import headers._ -import HttpEntity._ -import HttpMethods._ -import TestUtils._ class ClientServerSpec extends WordSpec with Matchers with BeforeAndAfterAll { val testConf: Config = ConfigFactory.parseString(""" akka.event-handlers = ["akka.testkit.TestEventListener"] akka.loglevel = WARNING""") implicit val system = ActorSystem(getClass.getSimpleName, testConf) - import system.dispatcher implicit val materializer = ActorFlowMaterializer() @@ -77,6 +77,18 @@ class ClientServerSpec extends WordSpec with Matchers with BeforeAndAfterAll { } } + "run with bindAndStartHandlingWith" in { + val (hostname, port) = temporaryServerHostnameAndPort() + val binding = Http().bindAndStartHandlingWith(Flow[HttpRequest].map(_ ⇒ HttpResponse()), hostname, port) + val b1 = Await.result(binding, 3.seconds) + + val (_, f) = Http().outgoingConnection(hostname, port) + .runWith(Source.single(HttpRequest(uri = "/abc")), Sink.head) + + Await.result(f, 1.second) + Await.result(b1.unbind(), 1.second) + } + "properly complete a simple request/response cycle" in new TestSetup { val (clientOut, clientIn) = openNewClientConnection() val (serverIn, serverOut) = acceptConnection() diff --git a/akka-http-tests/src/test/scala/akka/http/server/TestServer.scala b/akka-http-tests/src/test/scala/akka/http/server/TestServer.scala index 1d0340f89e..bb9717a1b6 100644 --- a/akka-http-tests/src/test/scala/akka/http/server/TestServer.scala +++ b/akka-http-tests/src/test/scala/akka/http/server/TestServer.scala @@ -28,7 +28,7 @@ object TestServer extends App { case _ ⇒ false } - val bindingFuture = Http().bindAndstartHandlingWith({ + val bindingFuture = Http().bindAndStartHandlingWith({ get { path("") { complete(index)