+htc #16972 makes bindAndStartHandlingWith actually run()

This commit is contained in:
Konrad Malawski 2015-03-14 09:55:10 -07:00
parent c383ce46e2
commit ac6723cba7
3 changed files with 33 additions and 21 deletions

View file

@ -60,13 +60,13 @@ class HttpExt(config: Config)(implicit system: ActorSystem) extends akka.actor.E
* connections are being accepted at maximum rate, which, depending on the applications, might
* present a DoS risk!
*/
def bindAndstartHandlingWith(handler: Flow[HttpRequest, HttpResponse, _],
def bindAndStartHandlingWith(handler: Flow[HttpRequest, HttpResponse, _],
interface: String, port: Int = 80, backlog: Int = 100,
options: immutable.Traversable[Inet.SocketOption] = Nil,
settings: Option[ServerSettings] = None,
log: LoggingAdapter = system.log)(implicit fm: FlowMaterializer): Future[ServerBinding] = {
bind(interface, port, backlog, options, settings, log).toMat(Sink.foreach { conn
conn.flow.join(handler)
conn.flow.join(handler).run()
})(Keep.left).run()
}
@ -82,7 +82,7 @@ class HttpExt(config: Config)(implicit system: ActorSystem) extends akka.actor.E
options: immutable.Traversable[Inet.SocketOption] = Nil,
settings: Option[ServerSettings] = None,
log: LoggingAdapter = system.log)(implicit fm: FlowMaterializer): Future[ServerBinding] =
bindAndstartHandlingWith(Flow[HttpRequest].map(handler), interface, port, backlog, options, settings, log)
bindAndStartHandlingWith(Flow[HttpRequest].map(handler), interface, port, backlog, options, settings, log)
/**
* Materializes the `connections` [[Source]] and handles all connections with the given flow.
@ -96,7 +96,7 @@ class HttpExt(config: Config)(implicit system: ActorSystem) extends akka.actor.E
options: immutable.Traversable[Inet.SocketOption] = Nil,
settings: Option[ServerSettings] = None,
log: LoggingAdapter = system.log)(implicit fm: FlowMaterializer): Future[ServerBinding] =
bindAndstartHandlingWith(Flow[HttpRequest].mapAsync(handler), interface, port, backlog, options, settings, log)
bindAndStartHandlingWith(Flow[HttpRequest].mapAsync(handler), interface, port, backlog, options, settings, log)
/**
* Transforms a given HTTP-level server [[Flow]] into a lower-level TCP transport flow.

View file

@ -6,32 +6,32 @@ package akka.http
import java.io.{ BufferedReader, BufferedWriter, InputStreamReader, OutputStreamWriter }
import java.net.Socket
import akka.actor.ActorSystem
import akka.http.TestUtils._
import akka.http.engine.client.ClientConnectionSettings
import akka.http.engine.server.ServerSettings
import akka.http.model.HttpEntity._
import akka.http.model.HttpMethods._
import akka.http.model._
import akka.http.model.headers._
import akka.http.util._
import akka.stream.{ ActorFlowMaterializer, BindFailedException }
import akka.stream.scaladsl._
import akka.stream.testkit.StreamTestKit
import akka.stream.testkit.StreamTestKit.{ PublisherProbe, SubscriberProbe }
import com.typesafe.config.{ Config, ConfigFactory }
import org.scalatest.{ BeforeAndAfterAll, Matchers, WordSpec }
import scala.annotation.tailrec
import scala.concurrent.Await
import scala.concurrent.duration._
import org.scalatest.{ BeforeAndAfterAll, Matchers, WordSpec }
import akka.actor.ActorSystem
import akka.stream.scaladsl._
import akka.stream.BindFailedException
import akka.stream.ActorFlowMaterializer
import akka.stream.testkit.StreamTestKit
import akka.stream.testkit.StreamTestKit.{ PublisherProbe, SubscriberProbe }
import akka.http.engine.client.ClientConnectionSettings
import akka.http.engine.server.ServerSettings
import akka.http.model._
import akka.http.util._
import headers._
import HttpEntity._
import HttpMethods._
import TestUtils._
class ClientServerSpec extends WordSpec with Matchers with BeforeAndAfterAll {
val testConf: Config = ConfigFactory.parseString("""
akka.event-handlers = ["akka.testkit.TestEventListener"]
akka.loglevel = WARNING""")
implicit val system = ActorSystem(getClass.getSimpleName, testConf)
import system.dispatcher
implicit val materializer = ActorFlowMaterializer()
@ -77,6 +77,18 @@ class ClientServerSpec extends WordSpec with Matchers with BeforeAndAfterAll {
}
}
"run with bindAndStartHandlingWith" in {
val (hostname, port) = temporaryServerHostnameAndPort()
val binding = Http().bindAndStartHandlingWith(Flow[HttpRequest].map(_ HttpResponse()), hostname, port)
val b1 = Await.result(binding, 3.seconds)
val (_, f) = Http().outgoingConnection(hostname, port)
.runWith(Source.single(HttpRequest(uri = "/abc")), Sink.head)
Await.result(f, 1.second)
Await.result(b1.unbind(), 1.second)
}
"properly complete a simple request/response cycle" in new TestSetup {
val (clientOut, clientIn) = openNewClientConnection()
val (serverIn, serverOut) = acceptConnection()

View file

@ -28,7 +28,7 @@ object TestServer extends App {
case _ false
}
val bindingFuture = Http().bindAndstartHandlingWith({
val bindingFuture = Http().bindAndStartHandlingWith({
get {
path("") {
complete(index)