From 20f8db99fa8c45e8e4dfa167c3accda163b632d1 Mon Sep 17 00:00:00 2001 From: Mathias Date: Thu, 27 Nov 2014 14:00:39 +0100 Subject: [PATCH] !htc #16162 upgrade HTTP-level APIs to end-user style --- .../docs/http/HttpServerExampleSpec.scala | 20 +- .../src/main/resources/reference.conf | 4 - .../src/main/scala/akka/http/Http.scala | 229 +++++++++++------- .../akka/http/engine/client/HttpClient.scala | 74 ++++++ .../engine/client/HttpClientPipeline.scala | 111 --------- .../engine/client/HttpClientProcessor.scala | 28 --- ...pServerPipeline.scala => HttpServer.scala} | 106 ++++---- .../scala/akka/http/ClientServerSpec.scala | 95 ++++---- .../src/test/scala/akka/http/TestClient.scala | 8 +- .../src/test/scala/akka/http/TestServer.scala | 36 ++- ...ipelineSpec.scala => HttpServerSpec.scala} | 24 +- 11 files changed, 354 insertions(+), 381 deletions(-) create mode 100644 akka-http-core/src/main/scala/akka/http/engine/client/HttpClient.scala delete mode 100644 akka-http-core/src/main/scala/akka/http/engine/client/HttpClientPipeline.scala delete mode 100644 akka-http-core/src/main/scala/akka/http/engine/client/HttpClientProcessor.scala rename akka-http-core/src/main/scala/akka/http/engine/server/{HttpServerPipeline.scala => HttpServer.scala} (63%) rename akka-http-core/src/test/scala/akka/http/engine/server/{HttpServerPipelineSpec.scala => HttpServerSpec.scala} (97%) diff --git a/akka-docs-dev/rst/scala/code/docs/http/HttpServerExampleSpec.scala b/akka-docs-dev/rst/scala/code/docs/http/HttpServerExampleSpec.scala index dbf5e6fa9d..10e84f1f1e 100644 --- a/akka-docs-dev/rst/scala/code/docs/http/HttpServerExampleSpec.scala +++ b/akka-docs-dev/rst/scala/code/docs/http/HttpServerExampleSpec.scala @@ -6,6 +6,7 @@ package docs.http import akka.actor.ActorSystem import akka.http.model._ +import akka.stream.scaladsl.Flow import akka.stream.testkit.AkkaSpec class HttpServerExampleSpec @@ -20,11 +21,9 @@ class HttpServerExampleSpec implicit val system = ActorSystem() implicit val materializer = FlowMaterializer() - val Http.ServerSource(source, serverBindingKey) = Http(system).bind(interface = "localhost", port = 8080) - source.foreach { - case Http.IncomingConnection(remoteAddress, flow) ⇒ - println("Accepted new connection from " + remoteAddress) - + val serverBinding = Http(system).bind(interface = "localhost", port = 8080) + for (connection <- serverBinding.connections) { + println("Accepted new connection from " + connection.remoteAddress) // handle connection here } //#bind-example @@ -37,7 +36,7 @@ class HttpServerExampleSpec implicit val system = ActorSystem() implicit val materializer = FlowMaterializer() - val Http.ServerSource(source, serverBindingKey) = Http(system).bind(interface = "localhost", port = 8080) + val serverBinding = Http(system).bind(interface = "localhost", port = 8080) //#full-server-example import akka.http.model.HttpMethods._ @@ -54,13 +53,10 @@ class HttpServerExampleSpec case _: HttpRequest ⇒ HttpResponse(404, entity = "Unknown resource!") } - // ... + serverBinding.connections foreach { connection => + println("Accepted new connection from " + connection.remoteAddress) - source.foreach { - case Http.IncomingConnection(remoteAddress, flow) ⇒ - println("Accepted new connection from " + remoteAddress) - - flow.join(Flow[HttpRequest].map(requestHandler)).run() + connection handleWith { Flow[HttpRequest] map requestHandler } } //#full-server-example } diff --git a/akka-http-core/src/main/resources/reference.conf b/akka-http-core/src/main/resources/reference.conf index bd88d2e5fb..b07f3a5ba6 100644 --- a/akka-http-core/src/main/resources/reference.conf +++ b/akka-http-core/src/main/resources/reference.conf @@ -163,8 +163,4 @@ akka.http { User-Agent = 32 } } - - # Fully qualified config path which holds the dispatcher configuration - # to be used for the HttpManager. - manager-dispatcher = "akka.actor.default-dispatcher" } diff --git a/akka-http-core/src/main/scala/akka/http/Http.scala b/akka-http-core/src/main/scala/akka/http/Http.scala index a4457fb069..77ade08bd9 100644 --- a/akka-http-core/src/main/scala/akka/http/Http.scala +++ b/akka-http-core/src/main/scala/akka/http/Http.scala @@ -4,109 +4,174 @@ package akka.http -import java.io.Closeable import java.net.InetSocketAddress +import com.typesafe.config.Config +import scala.collection.immutable +import scala.concurrent.Future +import akka.event.LoggingAdapter +import akka.util.ByteString +import akka.io.Inet +import akka.stream.FlowMaterializer import akka.stream.io.StreamTcp import akka.stream.scaladsl._ -import scala.collection.immutable -import akka.io.Inet -import akka.http.engine.client.{ HttpClientPipeline, ClientConnectionSettings } -import akka.http.engine.server.{ HttpServerPipeline, ServerSettings } +import akka.http.engine.client.{ HttpClient, ClientConnectionSettings } +import akka.http.engine.server.{ HttpServer, ServerSettings } import akka.http.model.{ ErrorInfo, HttpResponse, HttpRequest } import akka.actor._ -import scala.concurrent.Future - -object Http extends ExtensionKey[HttpExt] with ExtensionIdProvider { +class HttpExt(config: Config)(implicit system: ActorSystem) extends akka.actor.Extension { + import Http._ /** - * A flow representing an outgoing HTTP connection, and the key used to get information about - * the materialized connection. The flow takes pairs of a ``HttpRequest`` and a user definable - * context that will be correlated with the corresponding ``HttpResponse``. + * Creates a [[ServerBinding]] instance which represents a prospective HTTP server binding on the given `endpoint`. */ - final case class OutgoingFlow(flow: Flow[(HttpRequest, Any), (HttpResponse, Any)], - key: Key { type MaterializedType = Future[Http.OutgoingConnection] }) + def bind(interface: String, port: Int = 80, backlog: Int = 100, + options: immutable.Traversable[Inet.SocketOption] = Nil, + settings: Option[ServerSettings] = None, + log: LoggingAdapter = system.log): ServerBinding = { + val endpoint = new InetSocketAddress(interface, port) + val effectiveSettings = ServerSettings(settings) + val tcpBinding = StreamTcp().bind(endpoint, backlog, options, effectiveSettings.timeouts.idleTimeout) + new ServerBinding { + def localAddress(mm: MaterializedMap) = tcpBinding.localAddress(mm) + val connections = tcpBinding.connections map { tcpConn ⇒ + new IncomingConnection { + def localAddress = tcpConn.localAddress + def remoteAddress = tcpConn.remoteAddress + def handleWith(handler: Flow[HttpRequest, HttpResponse])(implicit fm: FlowMaterializer) = + tcpConn.handleWith(HttpServer.serverFlowToTransport(handler, effectiveSettings, log)) + } + } + def unbind(mm: MaterializedMap): Future[Unit] = tcpBinding.unbind(mm) + } + } /** - * The materialized result of an outgoing HTTP connection stream with a single connection as the underlying transport. + * Transforms a given HTTP-level server [[Flow]] into a lower-level TCP transport flow. */ - final case class OutgoingConnection(remoteAddress: InetSocketAddress, - localAddress: InetSocketAddress) + def serverFlowToTransport(serverFlow: Flow[HttpRequest, HttpResponse], + settings: Option[ServerSettings] = None, + log: LoggingAdapter = system.log): Flow[ByteString, ByteString] = { + val effectiveSettings = ServerSettings(settings) + HttpServer.serverFlowToTransport(serverFlow, effectiveSettings, log) + } /** - * A source representing an bound HTTP server socket, and the key to get information about - * the materialized bound socket. + * Creates an [[OutgoingConnection]] instance representing a prospective HTTP client connection to the given endpoint. */ - final case class ServerSource(source: Source[IncomingConnection], - key: Key { type MaterializedType = Future[ServerBinding] }) + def outgoingConnection(host: String, port: Int = 80, + localAddress: Option[InetSocketAddress] = None, + options: immutable.Traversable[Inet.SocketOption] = Nil, + settings: Option[ClientConnectionSettings] = None, + log: LoggingAdapter = system.log): OutgoingConnection = { + val effectiveSettings = ClientConnectionSettings(settings) + val remoteAddr = new InetSocketAddress(host, port) + val transportFlow = StreamTcp().outgoingConnection(remoteAddr, localAddress, + options, effectiveSettings.connectingTimeout, effectiveSettings.idleTimeout) + new OutgoingConnection { + def remoteAddress = remoteAddr + def localAddress(mm: MaterializedMap) = transportFlow.localAddress(mm) + val flow = HttpClient.transportToConnectionClientFlow(transportFlow.flow, remoteAddr, effectiveSettings, log) + } + } /** - * An incoming HTTP connection. + * Transforms the given low-level TCP client transport [[Flow]] into a higher-level HTTP client flow. */ - final case class IncomingConnection(remoteAddress: InetSocketAddress, stream: Flow[HttpResponse, HttpRequest]) + def transportToConnectionClientFlow(transport: Flow[ByteString, ByteString], + remoteAddress: InetSocketAddress, // TODO: removed after #16168 is cleared + settings: Option[ClientConnectionSettings] = None, + log: LoggingAdapter = system.log): Flow[HttpRequest, HttpResponse] = { + val effectiveSettings = ClientConnectionSettings(settings) + HttpClient.transportToConnectionClientFlow(transport, remoteAddress, effectiveSettings, log) + } +} + +object Http extends ExtensionId[HttpExt] with ExtensionIdProvider { + + /** + * Represents a prospective HTTP server binding. + */ + trait ServerBinding { + /** + * The local address of the endpoint bound by the materialization of the `connections` [[Source]] + * whose [[MaterializedMap]] is passed as parameter. + */ + def localAddress(materializedMap: MaterializedMap): Future[InetSocketAddress] + + /** + * The stream of accepted incoming connections. + * Can be materialized several times but only one subscription can be "live" at one time, i.e. + * subsequent materializations will reject subscriptions with an [[StreamTcp.BindFailedException]] if the previous + * materialization still has an uncancelled subscription. + * Cancelling the subscription to a materialization of this source will cause the listening port to be unbound. + */ + def connections: Source[IncomingConnection] + + /** + * Asynchronously triggers the unbinding of the port that was bound by the materialization of the `connections` + * [[Source]] whose [[MaterializedMap]] is passed as parameter. + * + * The produced [[Future]] is fulfilled when the unbinding has been completed. + */ + def unbind(materializedMap: MaterializedMap): Future[Unit] + } + + /** + * Represents one accepted incoming HTTP connection. + */ + sealed trait IncomingConnection { + /** + * The local address this connection is bound to. + */ + def localAddress: InetSocketAddress + + /** + * The remote address this connection is bound to. + */ + def remoteAddress: InetSocketAddress + + /** + * Handles the connection with the given flow, which is materialized exactly once + * and the respective [[MaterializedMap]] returned. + */ + def handleWith(handler: Flow[HttpRequest, HttpResponse])(implicit materializer: FlowMaterializer): MaterializedMap + } + + /** + * Represents a prospective outgoing HTTP connection. + */ + sealed trait OutgoingConnection { + /** + * The remote address this connection is or will be bound to. + */ + def remoteAddress: InetSocketAddress + + /** + * The local address of the endpoint bound by the materialization of the connection materialization + * whose [[MaterializedMap]] is passed as parameter. + */ + def localAddress(mMap: MaterializedMap): Future[InetSocketAddress] + + /** + * A flow representing the HTTP server on a single HTTP connection. + * This flow can be materialized several times, every materialization will open a new connection to the `remoteAddress`. + * If the connection cannot be established the materialized stream will immediately be terminated + * with a [[StreamTcp.ConnectionAttemptFailedException]]. + */ + def flow: Flow[HttpRequest, HttpResponse] + } + + class RequestTimeoutException(val request: HttpRequest, message: String) extends RuntimeException(message) class StreamException(val info: ErrorInfo) extends RuntimeException(info.summary) - /** - * The materialized result of a bound HTTP server socket. - */ - private[akka] sealed abstract case class ServerBinding(localAddress: InetSocketAddress) extends Closeable + //////////////////// EXTENSION SETUP /////////////////// - /** - * INTERNAL API - */ - private[akka] object ServerBinding { - def apply(localAddress: InetSocketAddress, closeable: Closeable): ServerBinding = - new ServerBinding(localAddress) { - override def close() = closeable.close() - } - } -} + def apply()(implicit system: ActorSystem): HttpExt = super.apply(system) -class HttpExt(system: ExtendedActorSystem) extends Extension { - @volatile private[this] var clientPipelines = Map.empty[ClientConnectionSettings, HttpClientPipeline] + def lookup() = Http - def connect(remoteAddress: InetSocketAddress, - localAddress: Option[InetSocketAddress], - options: immutable.Traversable[Inet.SocketOption], - settings: Option[ClientConnectionSettings]): Http.OutgoingFlow = { - // FIXME #16378 Where to do logging? log.debug("Attempting connection to {}", remoteAddress) - val effectiveSettings = ClientConnectionSettings(settings)(system) - - val tcpFlow = StreamTcp(system).connect(remoteAddress, localAddress, options, effectiveSettings.connectingTimeout) - val pipeline = clientPipelines.getOrElse(effectiveSettings, { - val pl = new HttpClientPipeline(effectiveSettings, system.log)(system.dispatcher) - clientPipelines = clientPipelines.updated(effectiveSettings, pl) - pl - }) - pipeline(tcpFlow, remoteAddress) - } - - def connect(host: String, port: Int = 80, - localAddress: Option[InetSocketAddress] = None, - options: immutable.Traversable[Inet.SocketOption] = Nil, - settings: Option[ClientConnectionSettings] = None): Http.OutgoingFlow = - connect(new InetSocketAddress(host, port), localAddress, options, settings) - - def bind(endpoint: InetSocketAddress, - backlog: Int, - options: immutable.Traversable[Inet.SocketOption], - serverSettings: Option[ServerSettings]): Http.ServerSource = { - import system.dispatcher - - // FIXME IdleTimeout? - val src = StreamTcp(system).bind(endpoint, backlog, options) - val key = new Key { - override type MaterializedType = Future[Http.ServerBinding] - override def materialize(map: MaterializedMap) = map.get(src).map(s ⇒ Http.ServerBinding(s.localAddress, s)) - } - val log = system.log - val effectiveSettings = ServerSettings(serverSettings)(system) - Http.ServerSource(src.withKey(key).map(new HttpServerPipeline(effectiveSettings, log)), key) - } - - def bind(interface: String, port: Int = 80, backlog: Int = 100, - options: immutable.Traversable[Inet.SocketOption] = Nil, - serverSettings: Option[ServerSettings] = None): Http.ServerSource = - bind(new InetSocketAddress(interface, port), backlog, options, serverSettings) -} + def createExtension(system: ExtendedActorSystem): HttpExt = + new HttpExt(system.settings.config getConfig "akka.http")(system) +} \ No newline at end of file diff --git a/akka-http-core/src/main/scala/akka/http/engine/client/HttpClient.scala b/akka-http-core/src/main/scala/akka/http/engine/client/HttpClient.scala new file mode 100644 index 0000000000..715ce81fbd --- /dev/null +++ b/akka-http-core/src/main/scala/akka/http/engine/client/HttpClient.scala @@ -0,0 +1,74 @@ +/** + * Copyright (C) 2009-2014 Typesafe Inc. + */ + +package akka.http.engine.client + +import java.net.InetSocketAddress +import scala.collection.immutable.Queue +import akka.util.ByteString +import akka.event.LoggingAdapter +import akka.stream.FlattenStrategy +import akka.stream.scaladsl._ +import akka.http.model.{ HttpMethod, HttpRequest, HttpResponse } +import akka.http.engine.rendering.{ RequestRenderingContext, HttpRequestRendererFactory } +import akka.http.engine.parsing.{ HttpHeaderParser, HttpResponseParser } +import akka.http.engine.parsing.ParserOutput._ +import akka.http.util._ + +/** + * INTERNAL API + */ +private[http] object HttpClient { + + def transportToConnectionClientFlow(transport: Flow[ByteString, ByteString], + remoteAddress: InetSocketAddress, + settings: ClientConnectionSettings, + log: LoggingAdapter): Flow[HttpRequest, HttpResponse] = { + import settings._ + + // the initial header parser we initially use for every connection, + // will not be mutated, all "shared copy" parsers copy on first-write into the header cache + val rootParser = new HttpResponseParser( + parserSettings, + HttpHeaderParser(parserSettings) { errorInfo ⇒ + if (parserSettings.illegalHeaderWarnings) log.warning(errorInfo.withSummaryPrepended("Illegal response header").formatPretty) + }) + + val requestRendererFactory = new HttpRequestRendererFactory(userAgentHeader, requestHeaderSizeHint, log) + val requestMethodByPass = new RequestMethodByPass(remoteAddress) + + Flow[HttpRequest] + .map(requestMethodByPass) + .transform("renderer", () ⇒ requestRendererFactory.newRenderer) + .flatten(FlattenStrategy.concat) + .transform("errorLogger", () ⇒ errorLogger(log, "Outgoing request stream error")) + .via(transport) + .transform("rootParser", () ⇒ + // each connection uses a single (private) response parser instance for all its responses + // which builds a cache of all header instances seen on that connection + rootParser.createShallowCopy(requestMethodByPass)) + .splitWhen(_.isInstanceOf[MessageStart]) + .headAndTail + .collect { + case (ResponseStart(statusCode, protocol, headers, createEntity, _), entityParts) ⇒ + HttpResponse(statusCode, headers, createEntity(entityParts), protocol) + } + } + + // FIXME: refactor to a pure-stream design that allows us to get rid of this ad-hoc queue here + class RequestMethodByPass(serverAddress: InetSocketAddress) + extends (HttpRequest ⇒ RequestRenderingContext) with (() ⇒ HttpMethod) { + private[this] var requestMethods = Queue.empty[HttpMethod] + def apply(request: HttpRequest) = { + requestMethods = requestMethods.enqueue(request.method) + RequestRenderingContext(request, serverAddress) + } + def apply(): HttpMethod = + if (requestMethods.nonEmpty) { + val method = requestMethods.head + requestMethods = requestMethods.tail + method + } else HttpResponseParser.NoMethod + } +} \ No newline at end of file diff --git a/akka-http-core/src/main/scala/akka/http/engine/client/HttpClientPipeline.scala b/akka-http-core/src/main/scala/akka/http/engine/client/HttpClientPipeline.scala deleted file mode 100644 index f0ca8e73e3..0000000000 --- a/akka-http-core/src/main/scala/akka/http/engine/client/HttpClientPipeline.scala +++ /dev/null @@ -1,111 +0,0 @@ -/** - * Copyright (C) 2009-2014 Typesafe Inc. - */ - -package akka.http.engine.client - -import java.net.InetSocketAddress -import scala.collection.immutable.Queue -import akka.stream.scaladsl._ -import akka.event.LoggingAdapter -import akka.stream.FlattenStrategy -import akka.stream.io.StreamTcp -import akka.util.ByteString -import akka.http.Http -import akka.http.model.{ HttpMethod, HttpRequest, ErrorInfo, HttpResponse } -import akka.http.engine.rendering.{ RequestRenderingContext, HttpRequestRendererFactory } -import akka.http.engine.parsing.{ HttpRequestParser, HttpHeaderParser, HttpResponseParser } -import akka.http.engine.parsing.ParserOutput._ -import akka.http.util._ - -import scala.concurrent.{ ExecutionContext, Future } - -/** - * INTERNAL API - */ -private[http] class HttpClientPipeline(effectiveSettings: ClientConnectionSettings, - log: LoggingAdapter)(implicit ec: ExecutionContext) - extends ((StreamTcp.OutgoingTcpFlow, InetSocketAddress) ⇒ Http.OutgoingFlow) { - - import effectiveSettings._ - - // the initial header parser we initially use for every connection, - // will not be mutated, all "shared copy" parsers copy on first-write into the header cache - val rootParser = new HttpResponseParser( - parserSettings, - HttpHeaderParser(parserSettings) { errorInfo ⇒ - if (parserSettings.illegalHeaderWarnings) log.warning(errorInfo.withSummaryPrepended("Illegal response header").formatPretty) - }) - - val requestRendererFactory = new HttpRequestRendererFactory(userAgentHeader, requestHeaderSizeHint, log) - - def apply(tcpFlow: StreamTcp.OutgoingTcpFlow, remoteAddress: InetSocketAddress): Http.OutgoingFlow = { - import FlowGraphImplicits._ - - val httpKey = new HttpKey(tcpFlow.key) - - val flowWithHttpKey = tcpFlow.flow.withKey(httpKey) - - val requestMethodByPass = new RequestMethodByPass(remoteAddress) - - val pipeline = Flow() { implicit b ⇒ - val userIn = UndefinedSource[(HttpRequest, Any)] - val userOut = UndefinedSink[(HttpResponse, Any)] - - val bypassFanout = Unzip[HttpRequest, Any]("bypassFanout") - val bypassFanin = Zip[HttpResponse, Any]("bypassFanin") - - val requestPipeline = - Flow[HttpRequest] - .map(requestMethodByPass) - .transform("renderer", () ⇒ requestRendererFactory.newRenderer) - .flatten(FlattenStrategy.concat) - .transform("errorLogger", () ⇒ errorLogger(log, "Outgoing request stream error")) - - val responsePipeline = - Flow[ByteString] - .transform("rootParser", () ⇒ - // each connection uses a single (private) response parser instance for all its responses - // which builds a cache of all header instances seen on that connection - rootParser.createShallowCopy(requestMethodByPass)) - .splitWhen(_.isInstanceOf[MessageStart]) - .headAndTail - .collect { - case (ResponseStart(statusCode, protocol, headers, createEntity, _), entityParts) ⇒ - HttpResponse(statusCode, headers, createEntity(entityParts), protocol) - } - - //FIXME: the graph is unnecessary after fixing #15957 - userIn ~> bypassFanout.in - bypassFanout.left ~> requestPipeline ~> flowWithHttpKey ~> responsePipeline ~> bypassFanin.left - bypassFanout.right ~> bypassFanin.right - bypassFanin.out ~> userOut - - userIn -> userOut - } - - Http.OutgoingFlow(pipeline, httpKey) - } - - class RequestMethodByPass(serverAddress: InetSocketAddress) - extends ((HttpRequest) ⇒ RequestRenderingContext) with (() ⇒ HttpMethod) { - private[this] var requestMethods = Queue.empty[HttpMethod] - def apply(request: HttpRequest) = { - requestMethods = requestMethods.enqueue(request.method) - RequestRenderingContext(request, serverAddress) - } - def apply(): HttpMethod = - if (requestMethods.nonEmpty) { - val method = requestMethods.head - requestMethods = requestMethods.tail - method - } else HttpResponseParser.NoMethod - } - - class HttpKey(tcpKey: Key { type MaterializedType = Future[StreamTcp.OutgoingTcpConnection] }) extends Key { - type MaterializedType = Future[Http.OutgoingConnection] - - override def materialize(map: MaterializedMap): MaterializedType = - map.get(tcpKey).map(tcp ⇒ Http.OutgoingConnection(tcp.remoteAddress, tcp.localAddress)) - } -} diff --git a/akka-http-core/src/main/scala/akka/http/engine/client/HttpClientProcessor.scala b/akka-http-core/src/main/scala/akka/http/engine/client/HttpClientProcessor.scala deleted file mode 100644 index ac0cddd68f..0000000000 --- a/akka-http-core/src/main/scala/akka/http/engine/client/HttpClientProcessor.scala +++ /dev/null @@ -1,28 +0,0 @@ -/** - * Copyright (C) 2009-2014 Typesafe Inc. - */ - -package akka.http.engine.client - -import akka.http.model.{ HttpResponse, HttpRequest } -import org.reactivestreams.{ Subscription, Publisher, Subscriber, Processor } - -/** - * A `HttpClientProcessor` models an HTTP client as a stream processor that provides - * responses for requests with an attached context object of a custom type, - * which is funneled through and completely transparent to the processor itself. - */ -trait HttpClientProcessor[T] extends Processor[(HttpRequest, T), (HttpResponse, T)] - -object HttpClientProcessor { - def apply[T](requestSubscriber: Subscriber[(HttpRequest, T)], - responsePublisher: Publisher[(HttpResponse, T)]): HttpClientProcessor[T] = - new HttpClientProcessor[T] { - override def subscribe(s: Subscriber[_ >: (HttpResponse, T)]): Unit = responsePublisher.subscribe(s) - - override def onError(t: Throwable): Unit = requestSubscriber.onError(t) - override def onSubscribe(s: Subscription): Unit = requestSubscriber.onSubscribe(s) - override def onComplete(): Unit = requestSubscriber.onComplete() - override def onNext(t: (HttpRequest, T)): Unit = requestSubscriber.onNext(t) - } -} \ No newline at end of file diff --git a/akka-http-core/src/main/scala/akka/http/engine/server/HttpServerPipeline.scala b/akka-http-core/src/main/scala/akka/http/engine/server/HttpServer.scala similarity index 63% rename from akka-http-core/src/main/scala/akka/http/engine/server/HttpServerPipeline.scala rename to akka-http-core/src/main/scala/akka/http/engine/server/HttpServer.scala index af94c742f5..7974aaf586 100644 --- a/akka-http-core/src/main/scala/akka/http/engine/server/HttpServerPipeline.scala +++ b/akka-http-core/src/main/scala/akka/http/engine/server/HttpServer.scala @@ -4,68 +4,40 @@ package akka.http.engine.server -import akka.actor.{ Props, ActorRef } +import akka.actor.{ ActorRef, Props } +import akka.util.ByteString import akka.event.LoggingAdapter import akka.stream.stage.PushPullStage -import akka.util.ByteString -import akka.stream.io.StreamTcp import akka.stream.FlattenStrategy -import akka.stream.FlowMaterializer import akka.stream.scaladsl._ import akka.http.engine.parsing.{ HttpHeaderParser, HttpRequestParser } import akka.http.engine.rendering.{ ResponseRenderingContext, HttpResponseRendererFactory } -import akka.http.model._ import akka.http.engine.parsing.ParserOutput._ -import akka.http.Http +import akka.http.model._ import akka.http.util._ +import akka.http.Http import scala.util.control.NonFatal /** * INTERNAL API */ -private[http] class HttpServerPipeline(settings: ServerSettings, log: LoggingAdapter) - extends (StreamTcp.IncomingTcpConnection ⇒ Http.IncomingConnection) { - import settings.parserSettings +private[http] object HttpServer { - // the initial header parser we initially use for every connection, - // will not be mutated, all "shared copy" parsers copy on first-write into the header cache - val rootParser = new HttpRequestParser( - parserSettings, - settings.rawRequestUriHeader, - HttpHeaderParser(parserSettings) { errorInfo ⇒ - if (parserSettings.illegalHeaderWarnings) log.warning(errorInfo.withSummaryPrepended("Illegal request header").formatPretty) - }) + def serverFlowToTransport(serverFlow: Flow[HttpRequest, HttpResponse], + settings: ServerSettings, + log: LoggingAdapter): Flow[ByteString, ByteString] = { - val responseRendererFactory = new HttpResponseRendererFactory(settings.serverHeader, settings.responseHeaderSizeHint, log) + // the initial header parser we initially use for every connection, + // will not be mutated, all "shared copy" parsers copy on first-write into the header cache + val rootParser = new HttpRequestParser( + settings.parserSettings, + settings.rawRequestUriHeader, + HttpHeaderParser(settings.parserSettings) { errorInfo ⇒ + if (settings.parserSettings.illegalHeaderWarnings) log.warning(errorInfo.withSummaryPrepended("Illegal request header").formatPretty) + }) - val bypassFanout = Broadcast[RequestOutput]("bypassFanout") - - val bypassMerge = new BypassMerge - - val requestPreparation = - Flow[RequestOutput] - .splitWhen(x ⇒ x.isInstanceOf[MessageStart] || x == MessageEnd) - .headAndTail - .collect { - case (RequestStart(method, uri, protocol, headers, createEntity, _, _), entityParts) ⇒ - val effectiveUri = HttpRequest.effectiveUri(uri, headers, securedConnection = false, settings.defaultHostHeader) - val effectiveMethod = if (method == HttpMethods.HEAD && settings.transparentHeadRequests) HttpMethods.GET else method - HttpRequest(effectiveMethod, effectiveUri, headers, createEntity(entityParts), protocol) - } - - val rendererPipeline = - Flow[ResponseRenderingContext] - .transform("recover", () ⇒ new ErrorsTo500ResponseRecovery(log)) // FIXME: simplify after #16394 is closed - .transform("renderer", () ⇒ responseRendererFactory.newRenderer) - .flatten(FlattenStrategy.concat) - .transform("errorLogger", () ⇒ errorLogger(log, "Outgoing response stream error")) - - def apply(tcpConn: StreamTcp.IncomingTcpConnection): Http.IncomingConnection = { - import FlowGraphImplicits._ - - val userIn = UndefinedSink[HttpRequest] - val userOut = UndefinedSource[HttpResponse] + val responseRendererFactory = new HttpResponseRendererFactory(settings.serverHeader, settings.responseHeaderSizeHint, log) @volatile var oneHundredContinueRef: Option[ActorRef] = None // FIXME: unnecessary after fixing #16168 val oneHundredContinueSource = Source[OneHundredContinue.type] { @@ -76,28 +48,50 @@ private[http] class HttpServerPipeline(settings: ServerSettings, log: LoggingAda } } - // FIXME The whole pipeline can maybe be created up front when #16168 is fixed - val pipeline = Flow() { implicit b ⇒ + val bypassFanout = Broadcast[RequestOutput]("bypassFanout") + val bypassMerge = new BypassMerge(settings, log) - val requestParsing = Flow[ByteString].transform("rootParser", () ⇒ - // each connection uses a single (private) request parser instance for all its requests - // which builds a cache of all header instances seen on that connection - rootParser.createShallowCopy(() ⇒ oneHundredContinueRef)) + val requestParsing = Flow[ByteString].transform("rootParser", () ⇒ + // each connection uses a single (private) request parser instance for all its requests + // which builds a cache of all header instances seen on that connection + rootParser.createShallowCopy(() ⇒ oneHundredContinueRef)) + val requestPreparation = + Flow[RequestOutput] + .splitWhen(x ⇒ x.isInstanceOf[MessageStart] || x == MessageEnd) + .headAndTail + .collect { + case (RequestStart(method, uri, protocol, headers, createEntity, _, _), entityParts) ⇒ + val effectiveUri = HttpRequest.effectiveUri(uri, headers, securedConnection = false, settings.defaultHostHeader) + val effectiveMethod = if (method == HttpMethods.HEAD && settings.transparentHeadRequests) HttpMethods.GET else method + HttpRequest(effectiveMethod, effectiveUri, headers, createEntity(entityParts), protocol) + } + + val rendererPipeline = + Flow[ResponseRenderingContext] + .transform("recover", () ⇒ new ErrorsTo500ResponseRecovery(log)) // FIXME: simplify after #16394 is closed + .transform("renderer", () ⇒ responseRendererFactory.newRenderer) + .flatten(FlattenStrategy.concat) + .transform("errorLogger", () ⇒ errorLogger(log, "Outgoing response stream error")) + + val transportIn = UndefinedSource[ByteString] + val transportOut = UndefinedSink[ByteString] + + import FlowGraphImplicits._ + + Flow() { implicit b ⇒ //FIXME: the graph is unnecessary after fixing #15957 - userOut ~> bypassMerge.applicationInput ~> rendererPipeline ~> tcpConn.stream ~> requestParsing ~> bypassFanout ~> requestPreparation ~> userIn + transportIn ~> requestParsing ~> bypassFanout ~> requestPreparation ~> serverFlow ~> bypassMerge.applicationInput ~> rendererPipeline ~> transportOut bypassFanout ~> bypassMerge.bypassInput oneHundredContinueSource ~> bypassMerge.oneHundredContinueInput b.allowCycles() - userOut -> userIn + transportIn -> transportOut } - - Http.IncomingConnection(tcpConn.remoteAddress, pipeline) } - class BypassMerge extends FlexiMerge[ResponseRenderingContext]("BypassMerge") { + class BypassMerge(settings: ServerSettings, log: LoggingAdapter) extends FlexiMerge[ResponseRenderingContext]("BypassMerge") { import FlexiMerge._ val bypassInput = createInputPort[RequestOutput]() val oneHundredContinueInput = createInputPort[OneHundredContinue.type]() diff --git a/akka-http-core/src/test/scala/akka/http/ClientServerSpec.scala b/akka-http-core/src/test/scala/akka/http/ClientServerSpec.scala index da98db5163..25be0cf585 100644 --- a/akka-http-core/src/test/scala/akka/http/ClientServerSpec.scala +++ b/akka-http-core/src/test/scala/akka/http/ClientServerSpec.scala @@ -12,6 +12,7 @@ import scala.concurrent.Await import scala.concurrent.duration._ import org.scalatest.{ BeforeAndAfterAll, Matchers, WordSpec } import akka.actor.ActorSystem +import akka.stream.io.StreamTcp import akka.stream.FlowMaterializer import akka.stream.testkit.StreamTestKit import akka.stream.testkit.StreamTestKit.{ PublisherProbe, SubscriberProbe } @@ -36,45 +37,39 @@ class ClientServerSpec extends WordSpec with Matchers with BeforeAndAfterAll { "The server-side HTTP infrastructure" should { - "properly bind and unbind a server" in { + "properly bind a server" in { val (hostname, port) = temporaryServerHostnameAndPort() - val Http.ServerSource(source, key) = Http(system).bind(hostname, port) - val c = StreamTestKit.SubscriberProbe[Http.IncomingConnection]() - val mm = source.to(Sink(c)).run() - val Http.ServerBinding(localAddress) = Await.result(mm.get(key), 3.seconds) - val sub = c.expectSubscription() - localAddress.getHostName shouldEqual hostname - localAddress.getPort shouldEqual port - + val binding = Http().bind(hostname, port) + val probe = StreamTestKit.SubscriberProbe[Http.IncomingConnection]() + binding.connections.runWith(Sink(probe)) + val sub = probe.expectSubscription() // if we get it we are bound sub.cancel() - - // TODO: verify unbinding effect } - "report failure if bind fails" in { + "report failure if bind fails" in pendingUntilFixed { // FIXME: "unpend"! val (hostname, port) = temporaryServerHostnameAndPort() - val Http.ServerSource(source, key) = Http(system).bind(hostname, port) - val c1 = StreamTestKit.SubscriberProbe[Http.IncomingConnection]() - val mm1 = source.to(Sink(c1)).run() - val sub = c1.expectSubscription() - val Http.ServerBinding(localAddress) = Await.result(mm1.get(key), 3.seconds) - localAddress.getHostName shouldEqual hostname - localAddress.getPort shouldEqual port - val c2 = StreamTestKit.SubscriberProbe[Http.IncomingConnection]() - val mm2 = source.to(Sink(c2)).run() - val failure = intercept[akka.stream.io.StreamTcp.IncomingTcpException] { - val serverBinding = Await.result(mm2.get(key), 3.seconds) - } - failure.getMessage should be("Bind failed") - sub.cancel() + val binding = Http().bind(hostname, port) + val probe1 = StreamTestKit.SubscriberProbe[Http.IncomingConnection]() + val mm1 = binding.connections.to(Sink(probe1)).run() + probe1.expectSubscription() + + val probe2 = StreamTestKit.SubscriberProbe[Http.IncomingConnection]() + binding.connections.runWith(Sink(probe2)) + probe2.expectError(StreamTcp.BindFailedException) + + Await.result(binding.unbind(mm1), 1.second) + val probe3 = StreamTestKit.SubscriberProbe[Http.IncomingConnection]() + val mm3 = binding.connections.to(Sink(probe3)).run() + probe3.expectSubscription() // we bound a second time, which means the previous unbind was successful + Await.result(binding.unbind(mm3), 1.second) } "properly complete a simple request/response cycle" in new TestSetup { - val (clientOut, clientIn) = openNewClientConnection[Symbol]() + val (clientOut, clientIn) = openNewClientConnection() val (serverIn, serverOut) = acceptConnection() val clientOutSub = clientOut.expectSubscription() - clientOutSub.sendNext(HttpRequest(uri = "/abc") -> 'abcContext) + clientOutSub.sendNext(HttpRequest(uri = "/abc")) val serverInSub = serverIn.expectSubscription() serverInSub.request(1) @@ -85,12 +80,12 @@ class ClientServerSpec extends WordSpec with Matchers with BeforeAndAfterAll { val clientInSub = clientIn.expectSubscription() clientInSub.request(1) - val (response, 'abcContext) = clientIn.expectNext() + val response = clientIn.expectNext() toStrict(response.entity) shouldEqual HttpEntity("yeah") } "properly complete a chunked request/response cycle" in new TestSetup { - val (clientOut, clientIn) = openNewClientConnection[Long]() + val (clientOut, clientIn) = openNewClientConnection() val (serverIn, serverOut) = acceptConnection() val chunks = List(Chunk("abc"), Chunk("defg"), Chunk("hijkl"), LastChunk) @@ -98,7 +93,7 @@ class ClientServerSpec extends WordSpec with Matchers with BeforeAndAfterAll { val chunkedEntity = HttpEntity.Chunked(chunkedContentType, Source(chunks)) val clientOutSub = clientOut.expectSubscription() - clientOutSub.sendNext(HttpRequest(POST, "/chunked", List(Accept(MediaRanges.`*/*`)), chunkedEntity) -> 12345678) + clientOutSub.sendNext(HttpRequest(POST, "/chunked", List(Accept(MediaRanges.`*/*`)), chunkedEntity)) val serverInSub = serverIn.expectSubscription() serverInSub.request(1) @@ -112,8 +107,8 @@ class ClientServerSpec extends WordSpec with Matchers with BeforeAndAfterAll { val clientInSub = clientIn.expectSubscription() clientInSub.request(1) - val (HttpResponse(StatusCodes.PartialContent, List(Date(_), Server(_), RawHeader("Age", "42")), - Chunked(`chunkedContentType`, chunkStream2), HttpProtocols.`HTTP/1.1`), 12345678) = clientIn.expectNext() + val HttpResponse(StatusCodes.PartialContent, List(Date(_), Server(_), RawHeader("Age", "42")), + Chunked(`chunkedContentType`, chunkStream2), HttpProtocols.`HTTP/1.1`) = clientIn.expectNext() Await.result(chunkStream2.grouped(1000).runWith(Sink.head), 100.millis) shouldEqual chunks } @@ -126,35 +121,35 @@ class ClientServerSpec extends WordSpec with Matchers with BeforeAndAfterAll { def configOverrides = "" // automatically bind a server - val connectionStream: SubscriberProbe[Http.IncomingConnection] = { + val connSource = { val settings = configOverrides.toOption.map(ServerSettings.apply) - val Http.ServerSource(source, key) = Http(system).bind(hostname, port, serverSettings = settings) + val binding = Http().bind(hostname, port, settings = settings) val probe = StreamTestKit.SubscriberProbe[Http.IncomingConnection] - source.to(Sink(probe)).run() + binding.connections.runWith(Sink(probe)) probe } - val connectionStreamSub = connectionStream.expectSubscription() + val connSourceSub = connSource.expectSubscription() - def openNewClientConnection[T](settings: Option[ClientConnectionSettings] = None): (PublisherProbe[(HttpRequest, T)], SubscriberProbe[(HttpResponse, T)]) = { - val outgoingFlow = Http(system).connect(hostname, port, settings = settings) - val requestPublisherProbe = StreamTestKit.PublisherProbe[(HttpRequest, T)]() - val responseSubscriberProbe = StreamTestKit.SubscriberProbe[(HttpResponse, T)]() - val tflow = outgoingFlow.flow.asInstanceOf[Flow[((HttpRequest, T)), ((HttpResponse, T))]] - val mm = Flow(Sink(responseSubscriberProbe), Source(requestPublisherProbe)).join(tflow).run() - val connection = Await.result(mm.get(outgoingFlow.key), 3.seconds) - connection.remoteAddress.getPort shouldEqual port + def openNewClientConnection(settings: Option[ClientConnectionSettings] = None): (PublisherProbe[HttpRequest], SubscriberProbe[HttpResponse]) = { + val requestPublisherProbe = StreamTestKit.PublisherProbe[HttpRequest]() + val responseSubscriberProbe = StreamTestKit.SubscriberProbe[HttpResponse]() + val connection = Http().outgoingConnection(hostname, port, settings = settings) connection.remoteAddress.getHostName shouldEqual hostname - + connection.remoteAddress.getPort shouldEqual port + Source(requestPublisherProbe).via(connection.flow).runWith(Sink(responseSubscriberProbe)) requestPublisherProbe -> responseSubscriberProbe } def acceptConnection(): (SubscriberProbe[HttpRequest], PublisherProbe[HttpResponse]) = { - connectionStreamSub.request(1) - val Http.IncomingConnection(remoteAddress, flow) = connectionStream.expectNext() + connSourceSub.request(1) + val incomingConnection = connSource.expectNext() + val sink = PublisherSink[HttpRequest]() + val source = SubscriberSource[HttpResponse]() + val mm = incomingConnection.handleWith(Flow(sink, source)) val requestSubscriberProbe = StreamTestKit.SubscriberProbe[HttpRequest]() val responsePublisherProbe = StreamTestKit.PublisherProbe[HttpResponse]() - Flow(Sink(requestSubscriberProbe), Source(responsePublisherProbe)).join(flow).run() - + mm.get(sink).subscribe(requestSubscriberProbe) + responsePublisherProbe.subscribe(mm.get(source)) requestSubscriberProbe -> responsePublisherProbe } diff --git a/akka-http-core/src/test/scala/akka/http/TestClient.scala b/akka-http-core/src/test/scala/akka/http/TestClient.scala index 4aca8607d6..055859610c 100644 --- a/akka-http-core/src/test/scala/akka/http/TestClient.scala +++ b/akka-http-core/src/test/scala/akka/http/TestClient.scala @@ -17,15 +17,15 @@ object TestClient extends App { akka.log-dead-letters = off """) implicit val system = ActorSystem("ServerTest", testConf) - import akka.http.TestClient.system.dispatcher + implicit val fm = FlowMaterializer() + import system.dispatcher - implicit val materializer = FlowMaterializer() val host = "spray.io" println(s"Fetching HTTP server version of host `$host` ...") - val outgoingFlow = Http(system).connect(host) - val result = Source.singleton(HttpRequest() -> 'NoContext).via(outgoingFlow.flow).map(_._1).runWith(Sink.head) + val connection = Http().outgoingConnection(host) + val result = Source.singleton(HttpRequest()).via(connection.flow).runWith(Sink.head) result.map(_.header[headers.Server]) onComplete { case Success(res) ⇒ println(s"$host is running ${res mkString ", "}") diff --git a/akka-http-core/src/test/scala/akka/http/TestServer.scala b/akka-http-core/src/test/scala/akka/http/TestServer.scala index b5ea8e0d3e..2aafdb1825 100644 --- a/akka-http-core/src/test/scala/akka/http/TestServer.scala +++ b/akka-http-core/src/test/scala/akka/http/TestServer.scala @@ -7,11 +7,9 @@ package akka.http import akka.actor.ActorSystem import akka.http.model._ import akka.stream.FlowMaterializer -import akka.stream.scaladsl.{ Flow, Sink } +import akka.stream.scaladsl.Flow import com.typesafe.config.{ ConfigFactory, Config } import HttpMethods._ -import scala.concurrent.Await -import scala.concurrent.duration._ object TestServer extends App { val testConf: Config = ConfigFactory.parseString(""" @@ -19,30 +17,26 @@ object TestServer extends App { akka.log-dead-letters = off """) implicit val system = ActorSystem("ServerTest", testConf) + implicit val fm = FlowMaterializer() - val requestHandler: HttpRequest ⇒ HttpResponse = { - case HttpRequest(GET, Uri.Path("/"), _, _, _) ⇒ index - case HttpRequest(GET, Uri.Path("/ping"), _, _, _) ⇒ HttpResponse(entity = "PONG!") - case HttpRequest(GET, Uri.Path("/crash"), _, _, _) ⇒ sys.error("BOOM!") - case _: HttpRequest ⇒ HttpResponse(404, entity = "Unknown resource!") + val binding = Http().bind(interface = "localhost", port = 8080) + + for (connection ← binding.connections) { + println("Accepted new connection from " + connection.remoteAddress) + connection handleWith { + Flow[HttpRequest] map { + case HttpRequest(GET, Uri.Path("/"), _, _, _) ⇒ index + case HttpRequest(GET, Uri.Path("/ping"), _, _, _) ⇒ HttpResponse(entity = "PONG!") + case HttpRequest(GET, Uri.Path("/crash"), _, _, _) ⇒ sys.error("BOOM!") + case _: HttpRequest ⇒ HttpResponse(404, entity = "Unknown resource!") + } + } } - implicit val materializer = FlowMaterializer() - - val Http.ServerSource(source, key) = Http(system).bind(interface = "localhost", port = 8080) - val materializedMap = source.to(Sink.foreach { - case Http.IncomingConnection(remoteAddress, flow) ⇒ - println("Accepted new connection from " + remoteAddress) - flow.join(Flow[HttpRequest].map(requestHandler)).run() - }).run() - - val serverBinding = Await.result(materializedMap.get(key), 3 seconds) - - println(s"Server online at http://${serverBinding.localAddress.getHostName}:${serverBinding.localAddress.getPort}") + println(s"Server online at http://localhost:8080") println("Press RETURN to stop...") Console.readLine() - serverBinding.close() system.shutdown() ////////////// helpers ////////////// diff --git a/akka-http-core/src/test/scala/akka/http/engine/server/HttpServerPipelineSpec.scala b/akka-http-core/src/test/scala/akka/http/engine/server/HttpServerSpec.scala similarity index 97% rename from akka-http-core/src/test/scala/akka/http/engine/server/HttpServerPipelineSpec.scala rename to akka-http-core/src/test/scala/akka/http/engine/server/HttpServerSpec.scala index 446aa0567a..b94425def2 100644 --- a/akka-http-core/src/test/scala/akka/http/engine/server/HttpServerPipelineSpec.scala +++ b/akka-http-core/src/test/scala/akka/http/engine/server/HttpServerSpec.scala @@ -10,9 +10,7 @@ import akka.event.NoLogging import akka.util.ByteString import akka.stream.scaladsl._ import akka.stream.FlowMaterializer -import akka.stream.io.StreamTcp import akka.stream.testkit.{ AkkaSpec, StreamTestKit } -import akka.http.Http import akka.http.model._ import akka.http.util._ import headers._ @@ -20,7 +18,7 @@ import HttpEntity._ import MediaTypes._ import HttpMethods._ -class HttpServerPipelineSpec extends AkkaSpec with Matchers with BeforeAndAfterAll with Inside { +class HttpServerSpec extends AkkaSpec with Matchers with BeforeAndAfterAll with Inside { implicit val materializer = FlowMaterializer() "The server implementation" should { @@ -612,14 +610,18 @@ class HttpServerPipelineSpec extends AkkaSpec with Matchers with BeforeAndAfterA } class TestSetup { - val netIn = StreamTestKit.PublisherProbe[ByteString] - val netOut = StreamTestKit.SubscriberProbe[ByteString] - val tcpConnection = StreamTcp.IncomingTcpConnection(null, Flow(Sink(netOut), Source(netIn))) + val requests = StreamTestKit.SubscriberProbe[HttpRequest] + val responses = StreamTestKit.PublisherProbe[HttpResponse] def settings = ServerSettings(system).copy(serverHeader = Some(Server(List(ProductVersion("akka-http", "test"))))) - val pipeline = new HttpServerPipeline(settings, NoLogging) - val Http.IncomingConnection(_, httpPipelineFlow) = pipeline(tcpConnection) + val (netIn, netOut) = { + val netIn = StreamTestKit.PublisherProbe[ByteString] + val netOut = StreamTestKit.SubscriberProbe[ByteString] + val transportFlow = HttpServer.serverFlowToTransport(Flow(Sink(requests), Source(responses)), settings, NoLogging) + Source(netIn).via(transportFlow).runWith(Sink(netOut)) + netIn -> netOut + } def wipeDate(string: String) = string.fastSplit('\n').map { @@ -627,10 +629,6 @@ class HttpServerPipelineSpec extends AkkaSpec with Matchers with BeforeAndAfterA case s ⇒ s }.mkString("\n") - val requests = StreamTestKit.SubscriberProbe[HttpRequest] - val responses = StreamTestKit.PublisherProbe[HttpResponse] - Flow(Sink(requests), Source(responses)).join(httpPipelineFlow).run() - val netInSub = netIn.expectSubscription() val netOutSub = netOut.expectSubscription() val requestsSub = requests.expectSubscription() @@ -647,4 +645,4 @@ class HttpServerPipelineSpec extends AkkaSpec with Matchers with BeforeAndAfterA def closeNetworkInput(): Unit = netInSub.sendComplete() } -} +} \ No newline at end of file