diff --git a/akka-docs-dev/rst/java.rst b/akka-docs-dev/rst/java.rst index 4eb4370ca8..c608687e28 100644 --- a/akka-docs-dev/rst/java.rst +++ b/akka-docs-dev/rst/java.rst @@ -1,10 +1,10 @@ .. _stream-java-api: Java Documentation -=================== +================== .. toctree:: - :maxdepth: 2 + :maxdepth: 3 java/stream-index java/http/index diff --git a/akka-docs-dev/rst/java/code/docs/http/javadsl/server/HighLevelServerBindFailureExample.java b/akka-docs-dev/rst/java/code/docs/http/javadsl/server/HighLevelServerBindFailureExample.java new file mode 100644 index 0000000000..475fa1d8d6 --- /dev/null +++ b/akka-docs-dev/rst/java/code/docs/http/javadsl/server/HighLevelServerBindFailureExample.java @@ -0,0 +1,39 @@ +/* + * Copyright (C) 2009-2014 Typesafe Inc. + */ + +package docs.http.javadsl.server; + +//#binding-failure-high-level-example +import akka.actor.ActorSystem; +import akka.dispatch.OnFailure; +import akka.http.javadsl.model.ContentTypes; +import akka.http.javadsl.server.*; +import akka.http.javadsl.server.values.Parameters; +import akka.http.scaladsl.Http; +import scala.concurrent.Future; + +import java.io.IOException; + +@SuppressWarnings("unchecked") +public class HighLevelServerBindFailureExample { + public static void main(String[] args) throws IOException { + // boot up server using the route as defined below + final ActorSystem system = ActorSystem.create(); + + // HttpApp.bindRoute expects a route being provided by HttpApp.createRoute + Future bindingFuture = + new HighLevelServerExample().bindRoute("localhost", 8080, system); + + bindingFuture.onFailure(new OnFailure() { + @Override + public void onFailure(Throwable failure) throws Throwable { + System.err.println("Something very bad happened! " + failure.getMessage()); + system.shutdown(); + } + }, system.dispatcher()); + + system.shutdown(); + } +} +//#binding-failure-high-level-example diff --git a/akka-docs-dev/rst/java/code/docs/http/javadsl/server/HighLevelServerExample.java b/akka-docs-dev/rst/java/code/docs/http/javadsl/server/HighLevelServerExample.java index 9b471fcaba..3a276f0bab 100644 --- a/akka-docs-dev/rst/java/code/docs/http/javadsl/server/HighLevelServerExample.java +++ b/akka-docs-dev/rst/java/code/docs/http/javadsl/server/HighLevelServerExample.java @@ -6,6 +6,7 @@ package docs.http.javadsl.server; //#high-level-server-example import akka.actor.ActorSystem; +import akka.http.javadsl.model.ContentTypes; import akka.http.javadsl.model.MediaTypes; import akka.http.javadsl.server.*; import akka.http.javadsl.server.values.Parameters; @@ -50,7 +51,7 @@ public class HighLevelServerExample extends HttpApp { // matches the empty path pathSingleSlash().route( // return a constant string with a certain content type - complete(MediaTypes.TEXT_HTML.toContentType(), + complete(ContentTypes.TEXT_HTML, "Hello world!") ), path("ping").route( diff --git a/akka-docs-dev/rst/java/code/docs/http/javadsl/server/HttpServerExampleDocTest.java b/akka-docs-dev/rst/java/code/docs/http/javadsl/server/HttpServerExampleDocTest.java index 3d44d6ecf5..a5b2c4990c 100644 --- a/akka-docs-dev/rst/java/code/docs/http/javadsl/server/HttpServerExampleDocTest.java +++ b/akka-docs-dev/rst/java/code/docs/http/javadsl/server/HttpServerExampleDocTest.java @@ -5,26 +5,46 @@ package docs.http.javadsl.server; import akka.actor.ActorSystem; +import akka.dispatch.OnFailure; +import akka.http.impl.util.JavaMapping; import akka.http.impl.util.Util; import akka.http.javadsl.Http; import akka.http.javadsl.IncomingConnection; import akka.http.javadsl.ServerBinding; import akka.http.javadsl.model.*; +import akka.http.javadsl.model.ContentTypes; +import akka.http.javadsl.model.HttpMethods; +import akka.http.javadsl.model.HttpRequest; +import akka.http.javadsl.model.HttpResponse; +import akka.http.javadsl.model.Uri; +import akka.http.scaladsl.model.*; +import akka.http.scaladsl.model.HttpEntity; +import akka.japi.JavaPartialFunction; import akka.japi.function.Function; import akka.japi.function.Procedure; import akka.stream.ActorMaterializer; import akka.stream.Materializer; +import akka.stream.javadsl.Flow; import akka.stream.javadsl.Sink; import akka.stream.javadsl.Source; +import akka.stream.stage.Context; +import akka.stream.stage.PushStage; +import akka.stream.stage.SyncDirective; +import akka.stream.stage.TerminationDirective; +import akka.util.ByteString; +import scala.Function1; import scala.concurrent.Await; import scala.concurrent.Future; import scala.concurrent.duration.FiniteDuration; +import scala.runtime.BoxedUnit; import java.io.BufferedReader; import java.io.InputStreamReader; import java.util.concurrent.TimeUnit; +@SuppressWarnings("unused") public class HttpServerExampleDocTest { + public static void bindingExample() throws Exception { //#binding-example ActorSystem system = ActorSystem.create(); @@ -45,6 +65,117 @@ public class HttpServerExampleDocTest { //#binding-example Await.result(serverBindingFuture, new FiniteDuration(3, TimeUnit.SECONDS)); } + + public static void bindingFailureExample() throws Exception { + //#binding-failure-handling + ActorSystem system = ActorSystem.create(); + Materializer materializer = ActorMaterializer.create(system); + + Source> serverSource = + Http.get(system).bind("localhost", 80, materializer); + + Future serverBindingFuture = + serverSource.to(Sink.foreach( + new Procedure() { + @Override + public void apply(IncomingConnection connection) throws Exception { + System.out.println("Accepted new connection from " + connection.remoteAddress()); + // ... and then actually handle the connection + } + })).run(materializer); + + serverBindingFuture.onFailure(new OnFailure() { + @Override + public void onFailure(Throwable failure) throws Throwable { + // possibly report the failure somewhere... + } + }, system.dispatcher()); + //#binding-failure-handling + Await.result(serverBindingFuture, new FiniteDuration(3, TimeUnit.SECONDS)); + } + + public static void connectionSourceFailureExample() throws Exception { + //#incoming-connections-source-failure-handling + ActorSystem system = ActorSystem.create(); + Materializer materializer = ActorMaterializer.create(system); + + Source> serverSource = + Http.get(system).bind("localhost", 8080, materializer); + + Flow failureDetection = + Flow.of(IncomingConnection.class).transform(() -> + new PushStage() { + @Override + public SyncDirective onPush(IncomingConnection elem, Context ctx) { + return ctx.push(elem); + } + + @Override + public TerminationDirective onUpstreamFailure(Throwable cause, Context ctx) { + // signal the failure to external monitoring service! + return super.onUpstreamFailure(cause, ctx); + } + }); + + Future serverBindingFuture = + serverSource + .via(failureDetection) // feed signals through our custom stage + .to(Sink.foreach( + new Procedure() { + @Override + public void apply(IncomingConnection connection) throws Exception { + System.out.println("Accepted new connection from " + connection.remoteAddress()); + // ... and then actually handle the connection + } + })).run(materializer); + //#incoming-connections-source-failure-handling + Await.result(serverBindingFuture, new FiniteDuration(3, TimeUnit.SECONDS)); + } + + public static void connectionStreamFailureExample() throws Exception { + //#connection-stream-failure-handling + ActorSystem system = ActorSystem.create(); + Materializer materializer = ActorMaterializer.create(system); + + Source> serverSource = + Http.get(system).bind("localhost", 8080, materializer); + + Flow failureDetection = + Flow.of(HttpRequest.class).transform(() -> + new PushStage() { + @Override + public SyncDirective onPush(HttpRequest elem, Context ctx) { + return ctx.push(elem); + } + + @Override + public TerminationDirective onUpstreamFailure(Throwable cause, Context ctx) { + // signal the failure to external monitoring service! + return super.onUpstreamFailure(cause, ctx); + } + }); + + Flow httpEcho = + Flow.of(HttpRequest.class) + .via(failureDetection) + .map(request -> { + Source bytes = request.entity().getDataBytes(); + HttpEntity.Chunked entity = HttpEntities.create(ContentTypes.TEXT_PLAIN, (Source) bytes); + + return HttpResponse.create() + .withEntity(entity); + }); + + Future serverBindingFuture = + serverSource.to(Sink.foreach(con -> { + System.out.println("Accepted new connection from " + con.remoteAddress()); + con.handleWith(httpEcho, materializer); + } + )).run(materializer); + //#connection-stream-failure-handling + Await.result(serverBindingFuture, new FiniteDuration(3, TimeUnit.SECONDS)); + } + public static void fullServerExample() throws Exception { //#full-server-example ActorSystem system = ActorSystem.create(); @@ -72,7 +203,7 @@ public class HttpServerExampleDocTest { if (uri.path().equals("/")) return HttpResponse.create() - .withEntity(MediaTypes.TEXT_HTML.toContentType(), + .withEntity(ContentTypes.TEXT_HTML, "Hello world!"); else if (uri.path().equals("/hello")) { String name = Util.getOrElse(uri.parameter("name"), "Mister X"); diff --git a/akka-docs-dev/rst/java/http/http-model.rst b/akka-docs-dev/rst/java/http/http-model.rst index ce52320bfb..6783d95212 100644 --- a/akka-docs-dev/rst/java/http/http-model.rst +++ b/akka-docs-dev/rst/java/http/http-model.rst @@ -42,11 +42,11 @@ HttpRequest An ``HttpRequest`` consists of - - a method (GET, POST, etc.) - - a URI - - a seq of headers - - an entity (body data) - - a protocol +- a method (GET, POST, etc.) +- a URI +- a seq of headers +- an entity (body data) +- a protocol Here are some examples how to construct an ``HttpRequest``: @@ -64,10 +64,10 @@ HttpResponse An ``HttpResponse`` consists of - - a status code - - a list of headers - - an entity (body data) - - a protocol +- a status code +- a list of headers +- an entity (body data) +- a protocol Here are some examples how to construct an ``HttpResponse``: diff --git a/akka-docs-dev/rst/java/http/index.rst b/akka-docs-dev/rst/java/http/index.rst index d2094d0701..62dee86967 100644 --- a/akka-docs-dev/rst/java/http/index.rst +++ b/akka-docs-dev/rst/java/http/index.rst @@ -31,11 +31,11 @@ akka-http-jackson .. toctree:: :maxdepth: 2 - configuration http-model server-side/low-level-server-side-api server-side/websocket-support routing-dsl/index client-side/index + configuration .. _jackson: https://github.com/FasterXML/jackson \ No newline at end of file diff --git a/akka-docs-dev/rst/java/http/routing-dsl/directives/index.rst b/akka-docs-dev/rst/java/http/routing-dsl/directives/index.rst index 846e1d1756..99e55c27ce 100644 --- a/akka-docs-dev/rst/java/http/routing-dsl/directives/index.rst +++ b/akka-docs-dev/rst/java/http/routing-dsl/directives/index.rst @@ -6,10 +6,10 @@ Directives A directive is a wrapper for a route or a list of alternative routes that adds one or more of the following functionality to its nested route(s): - * it filters the request and lets only matching requests pass (e.g. the `get` directive lets only GET-requests pass) - * it modifies the request or the ``RequestContext`` (e.g. the `path` directives filters on the unmatched path and then - passes an updated ``RequestContext`` unmatched path) - * it modifies the response coming out of the nested route +* it filters the request and lets only matching requests pass (e.g. the `get` directive lets only GET-requests pass) +* it modifies the request or the ``RequestContext`` (e.g. the `path` directives filters on the unmatched path and then + passes an updated ``RequestContext`` unmatched path) +* it modifies the response coming out of the nested route akka-http provides a set of predefined directives for various tasks. You can access them by either extending from ``akka.http.javadsl.server.AllDirectives`` or by importing them statically with @@ -46,7 +46,7 @@ MethodDirectives MiscDirectives Contains directives that validate a request by user-defined logic. -:ref:`PathDirectives-java` +:ref:`path-directives-java` Contains directives to match and filter on the URI path of the incoming request. RangeDirectives diff --git a/akka-docs-dev/rst/java/http/routing-dsl/directives/path-directives.rst b/akka-docs-dev/rst/java/http/routing-dsl/directives/path-directives.rst index 126ab19a4f..3655e182c9 100644 --- a/akka-docs-dev/rst/java/http/routing-dsl/directives/path-directives.rst +++ b/akka-docs-dev/rst/java/http/routing-dsl/directives/path-directives.rst @@ -1,4 +1,4 @@ -.. _PathDirectives-java: +.. _path-directives-java: PathDirectives ============== diff --git a/akka-docs-dev/rst/java/http/routing-dsl/overview.rst b/akka-docs-dev/rst/java/http/routing-dsl/overview.rst index ccd9c6daca..590af57ae6 100644 --- a/akka-docs-dev/rst/java/http/routing-dsl/overview.rst +++ b/akka-docs-dev/rst/java/http/routing-dsl/overview.rst @@ -74,3 +74,27 @@ quickly without running them over the network and helps with writing assertions Read more about :ref:`http-testkit-java`. .. _DRY: http://en.wikipedia.org/wiki/Don%27t_repeat_yourself + +.. _handling-http-server-failures-high-level-scala: + +Handling HTTP Server failures in the High-Level API +--------------------------------------------------- +There are various situations when failure may occur while initialising or running an Akka HTTP server. +Akka by default will log all these failures, however sometimes one may want to react to failures in addition +to them just being logged, for example by shutting down the actor system, or notifying some external monitoring +end-point explicitly. + +Bind failures +^^^^^^^^^^^^^ +For example the server might be unable to bind to the given port. For example when the port +is already taken by another application, or if the port is privileged (i.e. only usable by ``root``). +In this case the "binding future" will fail immediatly, and we can react to if by listening on the Future's completion: + +.. includecode:: ../../code/docs/http/javadsl/server/HighLevelServerBindFailureExample.java + :include: binding-failure-high-level-example + + +.. note:: + For a more low-level overview of the kinds of failures that can happen and also more fine-grained control over them + refer to the :ref:`handling-http-server-failures-low-level-java` documentation. + diff --git a/akka-docs-dev/rst/java/http/server-side/low-level-server-side-api.rst b/akka-docs-dev/rst/java/http/server-side/low-level-server-side-api.rst index 1f60302120..b01b763133 100644 --- a/akka-docs-dev/rst/java/http/server-side/low-level-server-side-api.rst +++ b/akka-docs-dev/rst/java/http/server-side/low-level-server-side-api.rst @@ -93,9 +93,9 @@ this connection. Requests are handled by calling one of the ``handleWithXXX`` methods with a handler, which can either be - - a ``Flow`` for ``handleWith``, - - a function ``Function`` for ``handleWithSyncHandler``, - - a function ``Function>`` for ``handleWithAsyncHandler``. +- a ``Flow`` for ``handleWith``, +- a function ``Function`` for ``handleWithSyncHandler``, +- a function ``Function>`` for ``handleWithAsyncHandler``. Here is a complete example: @@ -165,4 +165,64 @@ that is a stage that "upgrades" a potentially encrypted raw connection to the HT You create an instance of the layer by calling one of the two overloads of the ``Http.get(system).serverLayer`` method, which also allows for varying degrees of configuration. Note, that the returned instance is not reusable and can only -be materialized once. \ No newline at end of file +be materialized once. + +.. _handling-http-server-failures-low-level-java: + +Handling HTTP Server failures in the Low-Level API +-------------------------------------------------- + +There are various situations when failure may occur while initialising or running an Akka HTTP server. +Akka by default will log all these failures, however sometimes one may want to react to failures in addition to them +just being logged, for example by shutting down the actor system, or notifying some external monitoring end-point explicitly. + +There are multiple things that can fail when creating and materializing an HTTP Server (similarily, the same applied to +a plain streaming ``Tcp`` server). The types of failures that can happen on different layers of the stack, starting +from being unable to start the server, and ending with failing to unmarshal an HttpRequest, examples of failures include +(from outer-most, to inner-most): + +- Failure to ``bind`` to the specified address/port, +- Failure while accepting new ``IncommingConnection`` s, for example when the OS has run out of file descriptors or memory, +- Failure while handling a connection, for example if the incoming ``HttpRequest`` is malformed. + +This section describes how to handle each failure situation, and in which situations these failures may occur. + +Bind failures +^^^^^^^^^^^^^ + +The first type of failure is when the server is unable to bind to the given port. For example when the port +is already taken by another application, or if the port is privileged (i.e. only usable by ``root``). +In this case the "binding future" will fail immediatly, and we can react to if by listening on the Future's completion: + +.. includecode:: ../../code/docs/http/javadsl/server/HttpServerExampleDocTest.java + :include: binding-failure-handling + +Once the server has successfully bound to a port, the ``Source`` starts running and emiting +new incoming connections. This source technically can signal a failure as well, however this should only happen in very +dramantic situations such as running out of file descriptors or memory available to the system, such that it's not able +to accept a new incoming connection. Handling failures in Akka Streams is pretty stright forward, as failures are signaled +through the stream starting from the stage which failed, all the way downstream to the final stages. + +Connections Source failures +^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +In the example below we add a custom ``PushStage`` (see :ref:`stream-customize-java`) in order to react to the +stream's failure. We signal a ``failureMonitor`` actor with the cause why the stream is going down, and let the Actor +handle the rest – maybe it'll decide to restart the server or shutdown the ActorSystem, that however is not our concern anymore. + +.. includecode:: ../../code/docs/http/javadsl/server/HttpServerExampleDocTest.java + :include: incoming-connections-source-failure-handling + +Connection failures +^^^^^^^^^^^^^^^^^^^ + +The third type of failure that can occur is when the connection has been properly established, +however afterwards is terminated abruptly – for example by the client aborting the underlying TCP connection. +To handle this failure we can use the same pattern as in the previous snippet, however apply it to the connection's Flow: + +.. includecode:: ../../code/docs/http/javadsl/server/HttpServerExampleDocTest.java + :include: connection-stream-failure-handling + +These failures can be described more or less infrastructure related, they are failing bindings or connections. +Most of the time you won't need to dive into those very deeply, as Akka will simply log errors of this kind +anyway, which is a reasonable default for such problems. diff --git a/akka-docs-dev/rst/scala.rst b/akka-docs-dev/rst/scala.rst index 15375ba129..78cc44e68e 100644 --- a/akka-docs-dev/rst/scala.rst +++ b/akka-docs-dev/rst/scala.rst @@ -4,7 +4,7 @@ Scala Documentation =================== .. toctree:: - :maxdepth: 2 + :maxdepth: 3 scala/stream-index scala/http/index diff --git a/akka-docs-dev/rst/scala/code/docs/http/scaladsl/HttpServerExampleSpec.scala b/akka-docs-dev/rst/scala/code/docs/http/scaladsl/HttpServerExampleSpec.scala index b3278c2f6e..8834a98e37 100644 --- a/akka-docs-dev/rst/scala/code/docs/http/scaladsl/HttpServerExampleSpec.scala +++ b/akka-docs-dev/rst/scala/code/docs/http/scaladsl/HttpServerExampleSpec.scala @@ -3,20 +3,36 @@ */ package docs.http.scaladsl -/* -import scala.concurrent.Future -import org.scalatest.{ WordSpec, Matchers } -import akka.actor.ActorSystem + +import akka.actor.{ ActorRef, ActorSystem } +import akka.event.LoggingAdapter +import akka.http.scaladsl.Http +import akka.http.scaladsl.Http.ServerBinding +import akka.http.scaladsl.model._ +import akka.stream.ActorMaterializer +import akka.stream.scaladsl.{ Flow, Sink } +import akka.stream.stage.{ Context, PushStage } +import akka.testkit.TestActors +import org.scalatest.{ Matchers, WordSpec } +import scala.language.postfixOps + +import scala.concurrent.{ ExecutionContext, Future } class HttpServerExampleSpec extends WordSpec with Matchers { - "binding-example" in { + // never actually called + val log: LoggingAdapter = null + + def compileOnlySpec(body: => Unit) = () + + "binding-example" in compileOnlySpec { + import akka.http.scaladsl.Http import akka.stream.ActorMaterializer import akka.stream.scaladsl._ - import akka.http.scaladsl.Http implicit val system = ActorSystem() implicit val materializer = ActorMaterializer() + implicit val ec = system.dispatcher val serverSource: Source[Http.IncomingConnection, Future[Http.ServerBinding]] = Http().bind(interface = "localhost", port = 8080) @@ -27,12 +43,128 @@ class HttpServerExampleSpec extends WordSpec with Matchers { }).run() } - "full-server-example" in { + "binding-failure-high-level-example" in compileOnlySpec { + import akka.http.scaladsl.Http + import akka.http.scaladsl.server.Directives._ import akka.stream.ActorMaterializer - import akka.stream.scaladsl.Sink + + implicit val system = ActorSystem() + implicit val materializer = ActorMaterializer() + implicit val ec = system.dispatcher + + val handler = get { + complete("Hello world!") + } + + // let's say the OS won't allow us to bind to 80. + val (host, port) = ("localhost", 80) + val bindingFuture: Future[ServerBinding] = + Http().bindAndHandle(handler, host, port) + + bindingFuture onFailure { + case ex: Exception => + log.error(ex, "Failed to bind to {}:{}!", host, port) + } + + } + + // mock values: + val handleConnections: Sink[Http.IncomingConnection, Future[Http.ServerBinding]] = + Sink.ignore.mapMaterializedValue(_ => Future.failed(new Exception(""))) + + "binding-failure-handling" in compileOnlySpec { + implicit val system = ActorSystem() + implicit val materializer = ActorMaterializer() + implicit val ec = system.dispatcher + + // let's say the OS won't allow us to bind to 80. + val (host, port) = ("localhost", 80) + val serverSource = Http().bind(host, port) + + val bindingFuture: Future[ServerBinding] = serverSource + .to(handleConnections) // Sink[Http.IncomingConnection, _] + .run() + + bindingFuture onFailure { + case ex: Exception => + log.error(ex, "Failed to bind to {}:{}!", host, port) + } + } + + object MyExampleMonitoringActor { + def props = TestActors.echoActorProps + } + + "incoming-connections-source-failure-handling" in compileOnlySpec { + implicit val system = ActorSystem() + implicit val materializer = ActorMaterializer() + implicit val ec = system.dispatcher + + import Http._ + val (host, port) = ("localhost", 8080) + val serverSource = Http().bind(host, port) + + val failureMonitor: ActorRef = system.actorOf(MyExampleMonitoringActor.props) + + val reactToTopLevelFailures = Flow[IncomingConnection] + .transform { () => + new PushStage[IncomingConnection, IncomingConnection] { + override def onPush(elem: IncomingConnection, ctx: Context[IncomingConnection]) = + ctx.push(elem) + + override def onUpstreamFailure(cause: Throwable, ctx: Context[IncomingConnection]) = { + failureMonitor ! cause + super.onUpstreamFailure(cause, ctx) + } + } + } + + serverSource + .via(reactToTopLevelFailures) + .to(handleConnections) // Sink[Http.IncomingConnection, _] + .run() + } + + "connection-stream-failure-handling" in compileOnlySpec { + implicit val system = ActorSystem() + implicit val materializer = ActorMaterializer() + implicit val ec = system.dispatcher + + val (host, port) = ("localhost", 8080) + val serverSource = Http().bind(host, port) + + val reactToConnectionFailure = Flow[HttpRequest] + .transform { () => + new PushStage[HttpRequest, HttpRequest] { + override def onPush(elem: HttpRequest, ctx: Context[HttpRequest]) = + ctx.push(elem) + + override def onUpstreamFailure(cause: Throwable, ctx: Context[HttpRequest]) = { + // handle the failure somehow + super.onUpstreamFailure(cause, ctx) + } + } + } + + val httpEcho = Flow[HttpRequest] + .via(reactToConnectionFailure) + .map { request => + // simple text "echo" response: + HttpResponse(entity = HttpEntity(ContentTypes.`text/plain`, request.entity.dataBytes)) + } + + serverSource + .runForeach { con => + con.handleWith(httpEcho) + } + } + + "full-server-example" in compileOnlySpec { import akka.http.scaladsl.Http import akka.http.scaladsl.model.HttpMethods._ import akka.http.scaladsl.model._ + import akka.stream.ActorMaterializer + import akka.stream.scaladsl.Sink implicit val system = ActorSystem() implicit val materializer = ActorMaterializer() @@ -64,11 +196,11 @@ class HttpServerExampleSpec extends WordSpec with Matchers { }).run() } - "low-level-server-example" in { - import akka.stream.ActorMaterializer + "low-level-server-example" in compileOnlySpec { import akka.http.scaladsl.Http import akka.http.scaladsl.model.HttpMethods._ import akka.http.scaladsl.model._ + import akka.stream.ActorMaterializer implicit val system = ActorSystem() implicit val materializer = ActorMaterializer() @@ -93,11 +225,11 @@ class HttpServerExampleSpec extends WordSpec with Matchers { // format: OFF - "high-level-server-example" in { - import akka.stream.ActorMaterializer + "high-level-server-example" in compileOnlySpec { import akka.http.scaladsl.Http - import akka.http.scaladsl.server.Directives._ import akka.http.scaladsl.marshallers.xml.ScalaXmlSupport._ + import akka.http.scaladsl.server.Directives._ + import akka.stream.ActorMaterializer implicit val system = ActorSystem() implicit val materializer = ActorMaterializer() @@ -123,15 +255,16 @@ class HttpServerExampleSpec extends WordSpec with Matchers { Http().bindAndHandle(route, "localhost", 8080) } - "minimal-routing-example" in { - import akka.stream.ActorMaterializer + "minimal-routing-example" in compileOnlySpec { import akka.http.scaladsl.Http - import akka.http.scaladsl.server.Directives._ import akka.http.scaladsl.marshallers.xml.ScalaXmlSupport._ + import akka.http.scaladsl.server.Directives._ + import akka.stream.ActorMaterializer object Main extends App { implicit val system = ActorSystem("my-system") implicit val materializer = ActorMaterializer() + implicit val ec = system.dispatcher val route = path("hello") { @@ -145,24 +278,26 @@ class HttpServerExampleSpec extends WordSpec with Matchers { val bindingFuture = Http().bindAndHandle(route, "localhost", 8080) println(s"Server online at http://localhost:8080/\nPress RETURN to stop...") - Console.readLine() - - import system.dispatcher // for the future transformations + Console.readLine() // for the future transformations bindingFuture .flatMap(_.unbind()) // trigger unbinding from the port .onComplete(_ ⇒ system.shutdown()) // and shutdown when done } } - "long-routing-example" in { + "long-routing-example" in compileOnlySpec { + //#long-routing-example import akka.actor.ActorRef - import akka.util.Timeout - import akka.pattern.ask - import akka.http.scaladsl.marshalling.ToResponseMarshaller - import akka.http.scaladsl.unmarshalling.FromRequestUnmarshaller - import akka.http.scaladsl.model.StatusCodes.MovedPermanently import akka.http.scaladsl.coding.Deflate + import akka.http.scaladsl.marshalling.ToResponseMarshaller + import akka.http.scaladsl.model.StatusCodes.MovedPermanently import akka.http.scaladsl.server.Directives._ + // TODO: these explicit imports are only needed in complex cases, like below; Also, not needed on Scala 2.11 + import akka.http.scaladsl.server.directives.ParameterDirectives.ParamMagnet + import akka.http.scaladsl.server.directives.FormFieldDirectives.FieldMagnet + import akka.http.scaladsl.unmarshalling.FromRequestUnmarshaller + import akka.pattern.ask + import akka.util.Timeout // types used by the API routes type Money = Double // only for demo purposes, don't try this at home! @@ -171,9 +306,21 @@ class HttpServerExampleSpec extends WordSpec with Matchers { case class Order(email: String, amount: Money) case class Update(order: Order) case class OrderItem(i: Int, os: Option[String], s: String) + + // marshalling would usually be derived automatically using libraries implicit val orderUM: FromRequestUnmarshaller[Order] = ??? - implicit val orderM: ToResponseMarshaller[Seq[Order]] = ??? + implicit val orderM: ToResponseMarshaller[Order] = ??? + implicit val orderSeqM: ToResponseMarshaller[Seq[Order]] = ??? implicit val timeout: Timeout = ??? // for actor asks + implicit val ec: ExecutionContext = ??? + implicit val mat: ActorMaterializer = ??? + implicit val sys: ActorSystem = ??? + + // backend entry points + def myAuthenticator: Authenticator[User] = ??? + def retrieveOrdersFromDB: Seq[Order] = ??? + def myDbActor: ActorRef = ??? + def processOrderRequest(id: Int, complete: Order => Unit): Unit = ??? val route = { path("orders") { @@ -247,12 +394,5 @@ class HttpServerExampleSpec extends WordSpec with Matchers { redirect("http://oldapi.example.com/" + pathRest, MovedPermanently) } } - - // backend entry points - def myAuthenticator: Authenticator[User] = ??? - def retrieveOrdersFromDB: Seq[Order] = ??? - def myDbActor: ActorRef = ??? - def processOrderRequest(id: Int, complete: Order => Unit): Unit = ??? } } -*/ \ No newline at end of file diff --git a/akka-docs-dev/rst/scala/code/docs/stream/io/StreamTcpDocSpec.scala b/akka-docs-dev/rst/scala/code/docs/stream/io/StreamTcpDocSpec.scala index fd2e0ac94b..b9bd4ee11e 100644 --- a/akka-docs-dev/rst/scala/code/docs/stream/io/StreamTcpDocSpec.scala +++ b/akka-docs-dev/rst/scala/code/docs/stream/io/StreamTcpDocSpec.scala @@ -3,19 +3,15 @@ */ package docs.stream.io -import java.net.InetSocketAddress import java.util.concurrent.atomic.AtomicReference import akka.stream._ import akka.stream.scaladsl.Tcp._ import akka.stream.scaladsl._ -import akka.stream.stage.Context -import akka.stream.stage.PushStage -import akka.stream.stage.SyncDirective +import akka.stream.stage.{ Context, PushStage, SyncDirective } import akka.stream.testkit.AkkaSpec import akka.testkit.TestProbe import akka.util.ByteString -import docs.stream.cookbook.RecipeParseLines import docs.utils.TestUtils import scala.concurrent.Future @@ -31,8 +27,14 @@ class StreamTcpDocSpec extends AkkaSpec { "simple server connection" in { { //#echo-server-simple-bind - val connections: Source[IncomingConnection, Future[ServerBinding]] = - Tcp().bind("127.0.0.1", 8888) + val binding: Future[ServerBinding] = + Tcp().bind("127.0.0.1", 8888).to(Sink.ignore).run() + + binding.map { b => + b.unbind() onComplete { + case _ => // ... + } + } //#echo-server-simple-bind } { diff --git a/akka-docs-dev/rst/scala/http/low-level-server-side-api.rst b/akka-docs-dev/rst/scala/http/low-level-server-side-api.rst index 23e67ec865..8c132a6350 100644 --- a/akka-docs-dev/rst/scala/http/low-level-server-side-api.rst +++ b/akka-docs-dev/rst/scala/http/low-level-server-side-api.rst @@ -167,4 +167,68 @@ On the server-side the stand-alone HTTP layer forms a ``BidiFlow`` that is defin :snippet: server-layer You create an instance of ``Http.ServerLayer`` by calling one of the two overloads of the ``Http().serverLayer`` method, -which also allows for varying degrees of configuration. \ No newline at end of file +which also allows for varying degrees of configuration. + +.. _handling-http-server-failures-low-level-scala: + +Handling HTTP Server failures in the Low-Level API +-------------------------------------------------- + +There are various situations when failure may occur while initialising or running an Akka HTTP server. +Akka by default will log all these failures, however sometimes one may want to react to failures in addition to them +just being logged, for example by shutting down the actor system, or notifying some external monitoring end-point explicitly. + +There are multiple things that can fail when creating and materializing an HTTP Server (similarily, the same applied to +a plain streaming ``Tcp()`` server). The types of failures that can happen on different layers of the stack, starting +from being unable to start the server, and ending with failing to unmarshal an HttpRequest, examples of failures include +(from outer-most, to inner-most): + +- Failure to ``bind`` to the specified address/port, +- Failure while accepting new ``IncommingConnection`` s, for example when the OS has run out of file descriptors or memory, +- Failure while handling a connection, for example if the incoming ``HttpRequest`` is malformed. + +This section describes how to handle each failure situation, and in which situations these failures may occur. + +Bind failures +^^^^^^^^^^^^^ + +The first type of failure is when the server is unable to bind to the given port. For example when the port +is already taken by another application, or if the port is privileged (i.e. only usable by ``root``). +In this case the "binding future" will fail immediatly, and we can react to if by listening on the Future's completion: + +.. includecode2:: ../code/docs/http/scaladsl/HttpServerExampleSpec.scala + :snippet: binding-failure-handling + +Once the server has successfully bound to a port, the ``Source[IncomingConnection, _]`` starts running and emiting +new incoming connections. This source technically can signal a failure as well, however this should only happen in very +dramantic situations such as running out of file descriptors or memory available to the system, such that it's not able +to accept a new incoming connection. Handling failures in Akka Streams is pretty stright forward, as failures are signaled +through the stream starting from the stage which failed, all the way downstream to the final stages. + +Connections Source failures +^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +In the example below we add a custom ``PushStage`` (see :ref:`stream-customize-scala`) in order to react to the +stream's failure. We signal a ``failureMonitor`` actor with the cause why the stream is going down, and let the Actor +handle the rest – maybe it'll decide to restart the server or shutdown the ActorSystem, that however is not our concern anymore. + +.. includecode2:: ../code/docs/http/scaladsl/HttpServerExampleSpec.scala + :snippet: incoming-connections-source-failure-handling + +Connection failures +^^^^^^^^^^^^^^^^^^^ + +The third type of failure that can occur is when the connection has been properly established, +however afterwards is terminated abruptly – for example by the client aborting the underlying TCP connection. +To handle this failure we can use the same pattern as in the previous snippet, however apply it to the connection's Flow: + +.. includecode2:: ../code/docs/http/scaladsl/HttpServerExampleSpec.scala + :snippet: connection-stream-failure-handling + +These failures can be described more or less infrastructure related, they are failing bindings or connections. +Most of the time you won't need to dive into those very deeply, as Akka will simply log errors of this kind +anyway, which is a reasonable default for such problems. + +In order to learn more about handling exceptions in the actual routing layer, which is where your application code +comes into the picture, refer to :ref:`exception-handling-scala` which focuses explicitly on explaining how exceptions +thrown in routes can be handled and transformed into :class:`HttpResponse` s with apropriate error codes and human-readable failure descriptions. diff --git a/akka-docs-dev/rst/scala/http/routing-dsl/directives/execution-directives/handleExceptions.rst b/akka-docs-dev/rst/scala/http/routing-dsl/directives/execution-directives/handleExceptions.rst index 25f3cf4948..375f89e1bd 100644 --- a/akka-docs-dev/rst/scala/http/routing-dsl/directives/execution-directives/handleExceptions.rst +++ b/akka-docs-dev/rst/scala/http/routing-dsl/directives/execution-directives/handleExceptions.rst @@ -17,7 +17,7 @@ Description Using this directive is an alternative to using a global implicitly defined ``ExceptionHandler`` that applies to the complete route. -See :ref:`Exception Handling` for general information about options for handling exceptions. +See :ref:`exception-handling-scala` for general information about options for handling exceptions. Example ------- diff --git a/akka-docs-dev/rst/scala/http/routing-dsl/directives/route-directives/failWith.rst b/akka-docs-dev/rst/scala/http/routing-dsl/directives/route-directives/failWith.rst index 3e93a8037e..7c6fe57258 100644 --- a/akka-docs-dev/rst/scala/http/routing-dsl/directives/route-directives/failWith.rst +++ b/akka-docs-dev/rst/scala/http/routing-dsl/directives/route-directives/failWith.rst @@ -4,7 +4,7 @@ failWith ======== Bubbles up the given error through the route structure where it is dealt with by the closest ``handleExceptions`` -directive and its ``ExceptionHandler``. +directive and its :class:`ExceptionHandler`. Signature @@ -19,12 +19,12 @@ Description ``failWith`` explicitly raises an exception that gets bubbled up through the route structure to be picked up by the nearest ``handleExceptions`` directive. Using ``failWith`` rather than simply throwing an exception enables the route -structure's :ref:`Exception Handling` mechanism to deal with the exception even if the current route is executed +structure's :ref:`exception-handling-scala` mechanism to deal with the exception even if the current route is executed asynchronously on another thread (e.g. in a ``Future`` or separate actor). If no ``handleExceptions`` is present above the respective location in the route structure the top-level routing logic will handle the exception and translate it into a corresponding -``HttpResponse`` using the in-scope ``ExceptionHandler`` (see also the :ref:`Exception Handling` chapter). +``HttpResponse`` using the in-scope ``ExceptionHandler`` (see also the :ref:`exception-handling-scala` chapter). There is one notable special case: If the given exception is a ``RejectionError`` exception it is *not* bubbled up, but rather the wrapped exception is unpacked and "executed". This allows the "tunneling" of a rejection via an diff --git a/akka-docs-dev/rst/scala/http/routing-dsl/exception-handling.rst b/akka-docs-dev/rst/scala/http/routing-dsl/exception-handling.rst index c59280c8cd..b2f5169934 100644 --- a/akka-docs-dev/rst/scala/http/routing-dsl/exception-handling.rst +++ b/akka-docs-dev/rst/scala/http/routing-dsl/exception-handling.rst @@ -1,4 +1,4 @@ -.. _Exception Handling: +.. _exception-handling-scala: Exception Handling ================== diff --git a/akka-docs-dev/rst/scala/http/routing-dsl/index.rst b/akka-docs-dev/rst/scala/http/routing-dsl/index.rst index 916406e51c..6e0c25dfa4 100644 --- a/akka-docs-dev/rst/scala/http/routing-dsl/index.rst +++ b/akka-docs-dev/rst/scala/http/routing-dsl/index.rst @@ -43,4 +43,34 @@ not really do anything useful but its definition should give you a feel for what the Routing DSL will look like: .. includecode2:: ../../code/docs/http/scaladsl/HttpServerExampleSpec.scala - :snippet: long-routing-example \ No newline at end of file + :snippet: long-routing-example + +.. _handling-http-server-failures-high-level-scala: + +Handling HTTP Server failures in the High-Level API +--------------------------------------------------- +There are various situations when failure may occur while initialising or running an Akka HTTP server. +Akka by default will log all these failures, however sometimes one may want to react to failures in addition +to them just being logged, for example by shutting down the actor system, or notifying some external monitoring +end-point explicitly. + +Bind failures +^^^^^^^^^^^^^ +For example the server might be unable to bind to the given port. For example when the port +is already taken by another application, or if the port is privileged (i.e. only usable by ``root``). +In this case the "binding future" will fail immediatly, and we can react to if by listening on the Future's completion: + +.. includecode2:: ../../code/docs/http/scaladsl/HttpServerExampleSpec.scala + :snippet: binding-failure-high-level-example + + +.. note:: + For a more low-level overview of the kinds of failures that can happen and also more fine-grained control over them + refer to the :ref:`handling-http-server-failures-low-level-scala` documentation. + +Failures and exceptions inside the Routing DSL +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +Exception handling within the Routing DSL is done by providing :class:`ExceptionHandler` s which are documented in-depth +in the :ref:`exception-handling-scala` section of the documtnation. You can use them to transform exceptions into +:class:`HttpResponse` s with apropriate error codes and human-readable failure descriptions. diff --git a/akka-docs-dev/rst/scala/http/routing-dsl/routes.rst b/akka-docs-dev/rst/scala/http/routing-dsl/routes.rst index dffb3b33b8..999efb7a19 100644 --- a/akka-docs-dev/rst/scala/http/routing-dsl/routes.rst +++ b/akka-docs-dev/rst/scala/http/routing-dsl/routes.rst @@ -14,7 +14,7 @@ Generally when a route receives a request (or rather a ``RequestContext`` for it - Complete the request by returning the value of ``requestContext.complete(...)`` - Reject the request by returning the value of ``requestContext.reject(...)`` (see :ref:`Rejections`) -- Fail the request by returning the value of ``requestContext.fail(...)`` or by just throwing an exception (see :ref:`Exception Handling`) +- Fail the request by returning the value of ``requestContext.fail(...)`` or by just throwing an exception (see :ref:`exception-handling-scala`) - Do any kind of asynchronous processing and instantly return a ``Future[RouteResult]`` to be eventually completed later The first case is pretty clear, by calling ``complete`` a given response is sent to the client as reaction to the diff --git a/akka-http-core/src/main/java/akka/http/javadsl/model/ContentTypes.java b/akka-http-core/src/main/java/akka/http/javadsl/model/ContentTypes.java new file mode 100644 index 0000000000..3b29bcbb1d --- /dev/null +++ b/akka-http-core/src/main/java/akka/http/javadsl/model/ContentTypes.java @@ -0,0 +1,24 @@ +/** + * Copyright (C) 2009-2015 Typesafe Inc. + */ +package akka.http.javadsl.model; + + +/** + * Contains the set of predefined content-types for convenience. + *

+ * If the {@link ContentType} you're looking for is not pre-defined here, + * you can obtain it from a {@link MediaType} by using: {@code MediaTypes.TEXT_HTML.toContentType()} + */ +public final class ContentTypes { + public static final ContentType APPLICATION_JSON = MediaTypes.APPLICATION_JSON.toContentType(); + public static final ContentType APPLICATION_OCTET_STREAM = MediaTypes.APPLICATION_OCTET_STREAM.toContentType(); + + public static final ContentType TEXT_PLAIN = MediaTypes.TEXT_PLAIN.toContentType(); + public static final ContentType TEXT_PLAIN_UTF8 = akka.http.scaladsl.model.ContentTypes.text$divplain$u0028UTF$minus8$u0029(); + public static final ContentType TEXT_HTML = MediaTypes.TEXT_HTML.toContentType(); + public static final ContentType TEXT_XML = MediaTypes.TEXT_XML.toContentType(); + + public static final ContentType APPLICATION_X_WWW_FORM_URLENCODED = MediaTypes.APPLICATION_X_WWW_FORM_URLENCODED.toContentType(); + public static final ContentType MULTIPART_FORM_DATA = MediaTypes.MULTIPART_FORM_DATA.toContentType(); +} diff --git a/akka-http-core/src/main/java/akka/http/javadsl/model/HttpEntities.java b/akka-http-core/src/main/java/akka/http/javadsl/model/HttpEntities.java index 4a8b4f1b96..13ef29a3ec 100644 --- a/akka-http-core/src/main/java/akka/http/javadsl/model/HttpEntities.java +++ b/akka-http-core/src/main/java/akka/http/javadsl/model/HttpEntities.java @@ -7,8 +7,8 @@ package akka.http.javadsl.model; import java.io.File; import akka.http.impl.util.JavaAccessors; -import akka.http.scaladsl.model.*; import akka.http.scaladsl.model.HttpEntity; +import akka.http.scaladsl.model.HttpEntity$; import akka.util.ByteString; import akka.stream.javadsl.Source; diff --git a/akka-http-core/src/main/java/akka/http/javadsl/model/HttpMessage.java b/akka-http-core/src/main/java/akka/http/javadsl/model/HttpMessage.java index 6493b7bad4..53fb55b5e0 100644 --- a/akka-http-core/src/main/java/akka/http/javadsl/model/HttpMessage.java +++ b/akka-http-core/src/main/java/akka/http/javadsl/model/HttpMessage.java @@ -5,6 +5,7 @@ package akka.http.javadsl.model; import akka.japi.Option; +import akka.stream.javadsl.Source; import akka.util.ByteString; import java.io.File; diff --git a/akka-http-core/src/main/scala/akka/http/scaladsl/model/ContentType.scala b/akka-http-core/src/main/scala/akka/http/scaladsl/model/ContentType.scala index 25fc0b6b80..445e148317 100644 --- a/akka-http-core/src/main/scala/akka/http/scaladsl/model/ContentType.scala +++ b/akka-http-core/src/main/scala/akka/http/scaladsl/model/ContentType.scala @@ -90,9 +90,15 @@ object ContentType { object ContentTypes { val `application/json` = ContentType(MediaTypes.`application/json`) + val `application/octet-stream` = ContentType(MediaTypes.`application/octet-stream`) + val `text/plain` = ContentType(MediaTypes.`text/plain`) val `text/plain(UTF-8)` = ContentType(MediaTypes.`text/plain`, HttpCharsets.`UTF-8`) - val `application/octet-stream` = ContentType(MediaTypes.`application/octet-stream`) + val `text/html` = ContentType(MediaTypes.`text/html`) + val `text/xml` = ContentType(MediaTypes.`text/xml`) + + val `application/x-www-form-urlencoded` = ContentType(MediaTypes.`application/x-www-form-urlencoded`) + val `multipart/form-data` = ContentType(MediaTypes.`multipart/form-data`) // used for explicitly suppressing the rendering of Content-Type headers on requests and responses val NoContentType = ContentType(MediaTypes.NoMediaType)