=htc #16507 log materialization errors in bindAndHandle

This commit is contained in:
Mathias 2015-04-23 17:29:25 +02:00
parent 15b95ab0e5
commit 6ce8e0f29d
2 changed files with 47 additions and 12 deletions

View file

@ -8,8 +8,8 @@ import java.net.InetSocketAddress
import java.util.concurrent.ConcurrentHashMap
import akka.http.util.FastFuture
import com.typesafe.config.Config
import scala.util.control.NonFatal
import scala.util.Try
import scala.util.control.NonFatal
import scala.collection.immutable
import scala.concurrent.{ ExecutionContext, Promise, Future }
import akka.event.LoggingAdapter
@ -59,13 +59,20 @@ class HttpExt(config: Config)(implicit system: ActorSystem) extends akka.actor.E
* connections are being accepted at maximum rate, which, depending on the applications, might
* present a DoS risk!
*/
def bindAndHandle(handler: Flow[HttpRequest, HttpResponse, _],
def bindAndHandle(handler: Flow[HttpRequest, HttpResponse, Any],
interface: String, port: Int = 80, backlog: Int = 100,
options: immutable.Traversable[Inet.SocketOption] = Nil,
settings: ServerSettings = ServerSettings(system),
log: LoggingAdapter = system.log)(implicit fm: FlowMaterializer): Future[ServerBinding] =
bind(interface, port, backlog, options, settings, log).to {
Sink.foreach { _.flow.join(handler).run() }
Sink.foreach { incomingConnection
try incomingConnection.flow.joinMat(handler)(Keep.both).run()
catch {
case NonFatal(e)
log.error(e, "Could not materialize handling flow for {}", incomingConnection)
throw e
}
}
}.run()
/**

View file

@ -6,7 +6,10 @@ package akka.http
import java.io.{ BufferedReader, BufferedWriter, InputStreamReader, OutputStreamWriter }
import java.net.Socket
import akka.testkit.EventFilter
import scala.annotation.tailrec
import scala.concurrent.Await
import scala.concurrent.duration._
import akka.actor.ActorSystem
import akka.http.TestUtils._
import akka.http.engine.client.ClientConnectionSettings
@ -23,14 +26,12 @@ import akka.stream.testkit.StreamTestKit.{ PublisherProbe, SubscriberProbe }
import com.typesafe.config.{ Config, ConfigFactory }
import org.scalatest.{ BeforeAndAfterAll, Matchers, WordSpec }
import scala.annotation.tailrec
import scala.concurrent.Await
import scala.concurrent.duration._
class ClientServerSpec extends WordSpec with Matchers with BeforeAndAfterAll {
val testConf: Config = ConfigFactory.parseString("""
akka.event-handlers = ["akka.testkit.TestEventListener"]
akka.loglevel = WARNING""")
akka.loggers = ["akka.testkit.TestEventListener"]
akka.loglevel = ERROR
akka.stdout-loglevel = ERROR
akka.log-dead-letters = OFF""")
implicit val system = ActorSystem(getClass.getSimpleName, testConf)
implicit val materializer = ActorFlowMaterializer()
@ -77,9 +78,9 @@ class ClientServerSpec extends WordSpec with Matchers with BeforeAndAfterAll {
}
}
"run with bindAndHandle" in {
"run with bindAndHandleSync" in {
val (_, hostname, port) = temporaryServerHostnameAndPort()
val binding = Http().bindAndHandle(Flow[HttpRequest].map(_ HttpResponse()), hostname, port)
val binding = Http().bindAndHandleSync(_ HttpResponse(), hostname, port)
val b1 = Await.result(binding, 3.seconds)
val (_, f) = Http().outgoingConnection(hostname, port)
@ -89,6 +90,33 @@ class ClientServerSpec extends WordSpec with Matchers with BeforeAndAfterAll {
Await.result(b1.unbind(), 1.second)
}
"log materialization errors in `bindAndHandle`" which {
"are triggered in `transform`" in {
val (_, hostname, port) = temporaryServerHostnameAndPort()
val flow = Flow[HttpRequest].transform[HttpResponse](() sys.error("BOOM"))
val binding = Http().bindAndHandle(flow, hostname, port)
val b1 = Await.result(binding, 3.seconds)
EventFilter[RuntimeException](message = "BOOM", occurrences = 1) intercept {
val (_, responseFuture) = Http().outgoingConnection(hostname, port).runWith(Source.single(HttpRequest()), Sink.head)
Await.result(responseFuture.failed, 1.second) shouldBe a[NoSuchElementException]
}
Await.result(b1.unbind(), 1.second)
}
"are triggered in `mapMaterialized`" in {
val (_, hostname, port) = temporaryServerHostnameAndPort()
val flow = Flow[HttpRequest].map(_ HttpResponse()).mapMaterialized(_ sys.error("BOOM"))
val binding = Http().bindAndHandle(flow, hostname, port)
val b1 = Await.result(binding, 3.seconds)
EventFilter[RuntimeException](message = "BOOM", occurrences = 1) intercept {
val (_, responseFuture) = Http().outgoingConnection(hostname, port).runWith(Source.single(HttpRequest()), Sink.head)
Await.result(responseFuture.failed, 1.second) shouldBe a[NoSuchElementException]
}
Await.result(b1.unbind(), 1.second)
}
}
"properly complete a simple request/response cycle" in new TestSetup {
val (clientOut, clientIn) = openNewClientConnection()
val (serverIn, serverOut) = acceptConnection()