Merge pull request #18602 from ktoso/wip-htc-failure-docs-ktoso

+doc,htc #18600,18597 documents where/how to deal with failure in Http
This commit is contained in:
Konrad Malawski 2015-10-02 11:19:47 +02:00
commit 89a8d670c7
23 changed files with 597 additions and 75 deletions

View file

@ -3,20 +3,36 @@
*/
package docs.http.scaladsl
/*
import scala.concurrent.Future
import org.scalatest.{ WordSpec, Matchers }
import akka.actor.ActorSystem
import akka.actor.{ ActorRef, ActorSystem }
import akka.event.LoggingAdapter
import akka.http.scaladsl.Http
import akka.http.scaladsl.Http.ServerBinding
import akka.http.scaladsl.model._
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{ Flow, Sink }
import akka.stream.stage.{ Context, PushStage }
import akka.testkit.TestActors
import org.scalatest.{ Matchers, WordSpec }
import scala.language.postfixOps
import scala.concurrent.{ ExecutionContext, Future }
class HttpServerExampleSpec extends WordSpec with Matchers {
"binding-example" in {
// never actually called
val log: LoggingAdapter = null
def compileOnlySpec(body: => Unit) = ()
"binding-example" in compileOnlySpec {
import akka.http.scaladsl.Http
import akka.stream.ActorMaterializer
import akka.stream.scaladsl._
import akka.http.scaladsl.Http
implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()
implicit val ec = system.dispatcher
val serverSource: Source[Http.IncomingConnection, Future[Http.ServerBinding]] =
Http().bind(interface = "localhost", port = 8080)
@ -27,12 +43,128 @@ class HttpServerExampleSpec extends WordSpec with Matchers {
}).run()
}
"full-server-example" in {
"binding-failure-high-level-example" in compileOnlySpec {
import akka.http.scaladsl.Http
import akka.http.scaladsl.server.Directives._
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Sink
implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()
implicit val ec = system.dispatcher
val handler = get {
complete("Hello world!")
}
// let's say the OS won't allow us to bind to 80.
val (host, port) = ("localhost", 80)
val bindingFuture: Future[ServerBinding] =
Http().bindAndHandle(handler, host, port)
bindingFuture onFailure {
case ex: Exception =>
log.error(ex, "Failed to bind to {}:{}!", host, port)
}
}
// mock values:
val handleConnections: Sink[Http.IncomingConnection, Future[Http.ServerBinding]] =
Sink.ignore.mapMaterializedValue(_ => Future.failed(new Exception("")))
"binding-failure-handling" in compileOnlySpec {
implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()
implicit val ec = system.dispatcher
// let's say the OS won't allow us to bind to 80.
val (host, port) = ("localhost", 80)
val serverSource = Http().bind(host, port)
val bindingFuture: Future[ServerBinding] = serverSource
.to(handleConnections) // Sink[Http.IncomingConnection, _]
.run()
bindingFuture onFailure {
case ex: Exception =>
log.error(ex, "Failed to bind to {}:{}!", host, port)
}
}
object MyExampleMonitoringActor {
def props = TestActors.echoActorProps
}
"incoming-connections-source-failure-handling" in compileOnlySpec {
implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()
implicit val ec = system.dispatcher
import Http._
val (host, port) = ("localhost", 8080)
val serverSource = Http().bind(host, port)
val failureMonitor: ActorRef = system.actorOf(MyExampleMonitoringActor.props)
val reactToTopLevelFailures = Flow[IncomingConnection]
.transform { () =>
new PushStage[IncomingConnection, IncomingConnection] {
override def onPush(elem: IncomingConnection, ctx: Context[IncomingConnection]) =
ctx.push(elem)
override def onUpstreamFailure(cause: Throwable, ctx: Context[IncomingConnection]) = {
failureMonitor ! cause
super.onUpstreamFailure(cause, ctx)
}
}
}
serverSource
.via(reactToTopLevelFailures)
.to(handleConnections) // Sink[Http.IncomingConnection, _]
.run()
}
"connection-stream-failure-handling" in compileOnlySpec {
implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()
implicit val ec = system.dispatcher
val (host, port) = ("localhost", 8080)
val serverSource = Http().bind(host, port)
val reactToConnectionFailure = Flow[HttpRequest]
.transform { () =>
new PushStage[HttpRequest, HttpRequest] {
override def onPush(elem: HttpRequest, ctx: Context[HttpRequest]) =
ctx.push(elem)
override def onUpstreamFailure(cause: Throwable, ctx: Context[HttpRequest]) = {
// handle the failure somehow
super.onUpstreamFailure(cause, ctx)
}
}
}
val httpEcho = Flow[HttpRequest]
.via(reactToConnectionFailure)
.map { request =>
// simple text "echo" response:
HttpResponse(entity = HttpEntity(ContentTypes.`text/plain`, request.entity.dataBytes))
}
serverSource
.runForeach { con =>
con.handleWith(httpEcho)
}
}
"full-server-example" in compileOnlySpec {
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.HttpMethods._
import akka.http.scaladsl.model._
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Sink
implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()
@ -64,11 +196,11 @@ class HttpServerExampleSpec extends WordSpec with Matchers {
}).run()
}
"low-level-server-example" in {
import akka.stream.ActorMaterializer
"low-level-server-example" in compileOnlySpec {
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.HttpMethods._
import akka.http.scaladsl.model._
import akka.stream.ActorMaterializer
implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()
@ -93,11 +225,11 @@ class HttpServerExampleSpec extends WordSpec with Matchers {
// format: OFF
"high-level-server-example" in {
import akka.stream.ActorMaterializer
"high-level-server-example" in compileOnlySpec {
import akka.http.scaladsl.Http
import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.marshallers.xml.ScalaXmlSupport._
import akka.http.scaladsl.server.Directives._
import akka.stream.ActorMaterializer
implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()
@ -123,15 +255,16 @@ class HttpServerExampleSpec extends WordSpec with Matchers {
Http().bindAndHandle(route, "localhost", 8080)
}
"minimal-routing-example" in {
import akka.stream.ActorMaterializer
"minimal-routing-example" in compileOnlySpec {
import akka.http.scaladsl.Http
import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.marshallers.xml.ScalaXmlSupport._
import akka.http.scaladsl.server.Directives._
import akka.stream.ActorMaterializer
object Main extends App {
implicit val system = ActorSystem("my-system")
implicit val materializer = ActorMaterializer()
implicit val ec = system.dispatcher
val route =
path("hello") {
@ -145,24 +278,26 @@ class HttpServerExampleSpec extends WordSpec with Matchers {
val bindingFuture = Http().bindAndHandle(route, "localhost", 8080)
println(s"Server online at http://localhost:8080/\nPress RETURN to stop...")
Console.readLine()
import system.dispatcher // for the future transformations
Console.readLine() // for the future transformations
bindingFuture
.flatMap(_.unbind()) // trigger unbinding from the port
.onComplete(_ system.shutdown()) // and shutdown when done
}
}
"long-routing-example" in {
"long-routing-example" in compileOnlySpec {
//#long-routing-example
import akka.actor.ActorRef
import akka.util.Timeout
import akka.pattern.ask
import akka.http.scaladsl.marshalling.ToResponseMarshaller
import akka.http.scaladsl.unmarshalling.FromRequestUnmarshaller
import akka.http.scaladsl.model.StatusCodes.MovedPermanently
import akka.http.scaladsl.coding.Deflate
import akka.http.scaladsl.marshalling.ToResponseMarshaller
import akka.http.scaladsl.model.StatusCodes.MovedPermanently
import akka.http.scaladsl.server.Directives._
// TODO: these explicit imports are only needed in complex cases, like below; Also, not needed on Scala 2.11
import akka.http.scaladsl.server.directives.ParameterDirectives.ParamMagnet
import akka.http.scaladsl.server.directives.FormFieldDirectives.FieldMagnet
import akka.http.scaladsl.unmarshalling.FromRequestUnmarshaller
import akka.pattern.ask
import akka.util.Timeout
// types used by the API routes
type Money = Double // only for demo purposes, don't try this at home!
@ -171,9 +306,21 @@ class HttpServerExampleSpec extends WordSpec with Matchers {
case class Order(email: String, amount: Money)
case class Update(order: Order)
case class OrderItem(i: Int, os: Option[String], s: String)
// marshalling would usually be derived automatically using libraries
implicit val orderUM: FromRequestUnmarshaller[Order] = ???
implicit val orderM: ToResponseMarshaller[Seq[Order]] = ???
implicit val orderM: ToResponseMarshaller[Order] = ???
implicit val orderSeqM: ToResponseMarshaller[Seq[Order]] = ???
implicit val timeout: Timeout = ??? // for actor asks
implicit val ec: ExecutionContext = ???
implicit val mat: ActorMaterializer = ???
implicit val sys: ActorSystem = ???
// backend entry points
def myAuthenticator: Authenticator[User] = ???
def retrieveOrdersFromDB: Seq[Order] = ???
def myDbActor: ActorRef = ???
def processOrderRequest(id: Int, complete: Order => Unit): Unit = ???
val route = {
path("orders") {
@ -247,12 +394,5 @@ class HttpServerExampleSpec extends WordSpec with Matchers {
redirect("http://oldapi.example.com/" + pathRest, MovedPermanently)
}
}
// backend entry points
def myAuthenticator: Authenticator[User] = ???
def retrieveOrdersFromDB: Seq[Order] = ???
def myDbActor: ActorRef = ???
def processOrderRequest(id: Int, complete: Order => Unit): Unit = ???
}
}
*/

View file

@ -3,19 +3,15 @@
*/
package docs.stream.io
import java.net.InetSocketAddress
import java.util.concurrent.atomic.AtomicReference
import akka.stream._
import akka.stream.scaladsl.Tcp._
import akka.stream.scaladsl._
import akka.stream.stage.Context
import akka.stream.stage.PushStage
import akka.stream.stage.SyncDirective
import akka.stream.stage.{ Context, PushStage, SyncDirective }
import akka.stream.testkit.AkkaSpec
import akka.testkit.TestProbe
import akka.util.ByteString
import docs.stream.cookbook.RecipeParseLines
import docs.utils.TestUtils
import scala.concurrent.Future
@ -31,8 +27,14 @@ class StreamTcpDocSpec extends AkkaSpec {
"simple server connection" in {
{
//#echo-server-simple-bind
val connections: Source[IncomingConnection, Future[ServerBinding]] =
Tcp().bind("127.0.0.1", 8888)
val binding: Future[ServerBinding] =
Tcp().bind("127.0.0.1", 8888).to(Sink.ignore).run()
binding.map { b =>
b.unbind() onComplete {
case _ => // ...
}
}
//#echo-server-simple-bind
}
{