!htc #16162 upgrade HTTP-level APIs to end-user style

This commit is contained in:
Mathias 2014-11-27 14:00:39 +01:00
parent ab7d10135a
commit 20f8db99fa
11 changed files with 354 additions and 381 deletions

View file

@ -6,6 +6,7 @@ package docs.http
import akka.actor.ActorSystem
import akka.http.model._
import akka.stream.scaladsl.Flow
import akka.stream.testkit.AkkaSpec
class HttpServerExampleSpec
@ -20,11 +21,9 @@ class HttpServerExampleSpec
implicit val system = ActorSystem()
implicit val materializer = FlowMaterializer()
val Http.ServerSource(source, serverBindingKey) = Http(system).bind(interface = "localhost", port = 8080)
source.foreach {
case Http.IncomingConnection(remoteAddress, flow)
println("Accepted new connection from " + remoteAddress)
val serverBinding = Http(system).bind(interface = "localhost", port = 8080)
for (connection <- serverBinding.connections) {
println("Accepted new connection from " + connection.remoteAddress)
// handle connection here
}
//#bind-example
@ -37,7 +36,7 @@ class HttpServerExampleSpec
implicit val system = ActorSystem()
implicit val materializer = FlowMaterializer()
val Http.ServerSource(source, serverBindingKey) = Http(system).bind(interface = "localhost", port = 8080)
val serverBinding = Http(system).bind(interface = "localhost", port = 8080)
//#full-server-example
import akka.http.model.HttpMethods._
@ -54,13 +53,10 @@ class HttpServerExampleSpec
case _: HttpRequest HttpResponse(404, entity = "Unknown resource!")
}
// ...
serverBinding.connections foreach { connection =>
println("Accepted new connection from " + connection.remoteAddress)
source.foreach {
case Http.IncomingConnection(remoteAddress, flow)
println("Accepted new connection from " + remoteAddress)
flow.join(Flow[HttpRequest].map(requestHandler)).run()
connection handleWith { Flow[HttpRequest] map requestHandler }
}
//#full-server-example
}

View file

@ -163,8 +163,4 @@ akka.http {
User-Agent = 32
}
}
# Fully qualified config path which holds the dispatcher configuration
# to be used for the HttpManager.
manager-dispatcher = "akka.actor.default-dispatcher"
}

View file

@ -4,109 +4,174 @@
package akka.http
import java.io.Closeable
import java.net.InetSocketAddress
import com.typesafe.config.Config
import scala.collection.immutable
import scala.concurrent.Future
import akka.event.LoggingAdapter
import akka.util.ByteString
import akka.io.Inet
import akka.stream.FlowMaterializer
import akka.stream.io.StreamTcp
import akka.stream.scaladsl._
import scala.collection.immutable
import akka.io.Inet
import akka.http.engine.client.{ HttpClientPipeline, ClientConnectionSettings }
import akka.http.engine.server.{ HttpServerPipeline, ServerSettings }
import akka.http.engine.client.{ HttpClient, ClientConnectionSettings }
import akka.http.engine.server.{ HttpServer, ServerSettings }
import akka.http.model.{ ErrorInfo, HttpResponse, HttpRequest }
import akka.actor._
import scala.concurrent.Future
object Http extends ExtensionKey[HttpExt] with ExtensionIdProvider {
class HttpExt(config: Config)(implicit system: ActorSystem) extends akka.actor.Extension {
import Http._
/**
* A flow representing an outgoing HTTP connection, and the key used to get information about
* the materialized connection. The flow takes pairs of a ``HttpRequest`` and a user definable
* context that will be correlated with the corresponding ``HttpResponse``.
* Creates a [[ServerBinding]] instance which represents a prospective HTTP server binding on the given `endpoint`.
*/
final case class OutgoingFlow(flow: Flow[(HttpRequest, Any), (HttpResponse, Any)],
key: Key { type MaterializedType = Future[Http.OutgoingConnection] })
def bind(interface: String, port: Int = 80, backlog: Int = 100,
options: immutable.Traversable[Inet.SocketOption] = Nil,
settings: Option[ServerSettings] = None,
log: LoggingAdapter = system.log): ServerBinding = {
val endpoint = new InetSocketAddress(interface, port)
val effectiveSettings = ServerSettings(settings)
val tcpBinding = StreamTcp().bind(endpoint, backlog, options, effectiveSettings.timeouts.idleTimeout)
new ServerBinding {
def localAddress(mm: MaterializedMap) = tcpBinding.localAddress(mm)
val connections = tcpBinding.connections map { tcpConn
new IncomingConnection {
def localAddress = tcpConn.localAddress
def remoteAddress = tcpConn.remoteAddress
def handleWith(handler: Flow[HttpRequest, HttpResponse])(implicit fm: FlowMaterializer) =
tcpConn.handleWith(HttpServer.serverFlowToTransport(handler, effectiveSettings, log))
}
}
def unbind(mm: MaterializedMap): Future[Unit] = tcpBinding.unbind(mm)
}
}
/**
* The materialized result of an outgoing HTTP connection stream with a single connection as the underlying transport.
* Transforms a given HTTP-level server [[Flow]] into a lower-level TCP transport flow.
*/
final case class OutgoingConnection(remoteAddress: InetSocketAddress,
localAddress: InetSocketAddress)
def serverFlowToTransport(serverFlow: Flow[HttpRequest, HttpResponse],
settings: Option[ServerSettings] = None,
log: LoggingAdapter = system.log): Flow[ByteString, ByteString] = {
val effectiveSettings = ServerSettings(settings)
HttpServer.serverFlowToTransport(serverFlow, effectiveSettings, log)
}
/**
* A source representing an bound HTTP server socket, and the key to get information about
* the materialized bound socket.
* Creates an [[OutgoingConnection]] instance representing a prospective HTTP client connection to the given endpoint.
*/
final case class ServerSource(source: Source[IncomingConnection],
key: Key { type MaterializedType = Future[ServerBinding] })
def outgoingConnection(host: String, port: Int = 80,
localAddress: Option[InetSocketAddress] = None,
options: immutable.Traversable[Inet.SocketOption] = Nil,
settings: Option[ClientConnectionSettings] = None,
log: LoggingAdapter = system.log): OutgoingConnection = {
val effectiveSettings = ClientConnectionSettings(settings)
val remoteAddr = new InetSocketAddress(host, port)
val transportFlow = StreamTcp().outgoingConnection(remoteAddr, localAddress,
options, effectiveSettings.connectingTimeout, effectiveSettings.idleTimeout)
new OutgoingConnection {
def remoteAddress = remoteAddr
def localAddress(mm: MaterializedMap) = transportFlow.localAddress(mm)
val flow = HttpClient.transportToConnectionClientFlow(transportFlow.flow, remoteAddr, effectiveSettings, log)
}
}
/**
* An incoming HTTP connection.
* Transforms the given low-level TCP client transport [[Flow]] into a higher-level HTTP client flow.
*/
final case class IncomingConnection(remoteAddress: InetSocketAddress, stream: Flow[HttpResponse, HttpRequest])
def transportToConnectionClientFlow(transport: Flow[ByteString, ByteString],
remoteAddress: InetSocketAddress, // TODO: removed after #16168 is cleared
settings: Option[ClientConnectionSettings] = None,
log: LoggingAdapter = system.log): Flow[HttpRequest, HttpResponse] = {
val effectiveSettings = ClientConnectionSettings(settings)
HttpClient.transportToConnectionClientFlow(transport, remoteAddress, effectiveSettings, log)
}
}
object Http extends ExtensionId[HttpExt] with ExtensionIdProvider {
/**
* Represents a prospective HTTP server binding.
*/
trait ServerBinding {
/**
* The local address of the endpoint bound by the materialization of the `connections` [[Source]]
* whose [[MaterializedMap]] is passed as parameter.
*/
def localAddress(materializedMap: MaterializedMap): Future[InetSocketAddress]
/**
* The stream of accepted incoming connections.
* Can be materialized several times but only one subscription can be "live" at one time, i.e.
* subsequent materializations will reject subscriptions with an [[StreamTcp.BindFailedException]] if the previous
* materialization still has an uncancelled subscription.
* Cancelling the subscription to a materialization of this source will cause the listening port to be unbound.
*/
def connections: Source[IncomingConnection]
/**
* Asynchronously triggers the unbinding of the port that was bound by the materialization of the `connections`
* [[Source]] whose [[MaterializedMap]] is passed as parameter.
*
* The produced [[Future]] is fulfilled when the unbinding has been completed.
*/
def unbind(materializedMap: MaterializedMap): Future[Unit]
}
/**
* Represents one accepted incoming HTTP connection.
*/
sealed trait IncomingConnection {
/**
* The local address this connection is bound to.
*/
def localAddress: InetSocketAddress
/**
* The remote address this connection is bound to.
*/
def remoteAddress: InetSocketAddress
/**
* Handles the connection with the given flow, which is materialized exactly once
* and the respective [[MaterializedMap]] returned.
*/
def handleWith(handler: Flow[HttpRequest, HttpResponse])(implicit materializer: FlowMaterializer): MaterializedMap
}
/**
* Represents a prospective outgoing HTTP connection.
*/
sealed trait OutgoingConnection {
/**
* The remote address this connection is or will be bound to.
*/
def remoteAddress: InetSocketAddress
/**
* The local address of the endpoint bound by the materialization of the connection materialization
* whose [[MaterializedMap]] is passed as parameter.
*/
def localAddress(mMap: MaterializedMap): Future[InetSocketAddress]
/**
* A flow representing the HTTP server on a single HTTP connection.
* This flow can be materialized several times, every materialization will open a new connection to the `remoteAddress`.
* If the connection cannot be established the materialized stream will immediately be terminated
* with a [[StreamTcp.ConnectionAttemptFailedException]].
*/
def flow: Flow[HttpRequest, HttpResponse]
}
class RequestTimeoutException(val request: HttpRequest, message: String) extends RuntimeException(message)
class StreamException(val info: ErrorInfo) extends RuntimeException(info.summary)
/**
* The materialized result of a bound HTTP server socket.
*/
private[akka] sealed abstract case class ServerBinding(localAddress: InetSocketAddress) extends Closeable
//////////////////// EXTENSION SETUP ///////////////////
/**
* INTERNAL API
*/
private[akka] object ServerBinding {
def apply(localAddress: InetSocketAddress, closeable: Closeable): ServerBinding =
new ServerBinding(localAddress) {
override def close() = closeable.close()
}
}
}
def apply()(implicit system: ActorSystem): HttpExt = super.apply(system)
class HttpExt(system: ExtendedActorSystem) extends Extension {
@volatile private[this] var clientPipelines = Map.empty[ClientConnectionSettings, HttpClientPipeline]
def lookup() = Http
def connect(remoteAddress: InetSocketAddress,
localAddress: Option[InetSocketAddress],
options: immutable.Traversable[Inet.SocketOption],
settings: Option[ClientConnectionSettings]): Http.OutgoingFlow = {
// FIXME #16378 Where to do logging? log.debug("Attempting connection to {}", remoteAddress)
val effectiveSettings = ClientConnectionSettings(settings)(system)
val tcpFlow = StreamTcp(system).connect(remoteAddress, localAddress, options, effectiveSettings.connectingTimeout)
val pipeline = clientPipelines.getOrElse(effectiveSettings, {
val pl = new HttpClientPipeline(effectiveSettings, system.log)(system.dispatcher)
clientPipelines = clientPipelines.updated(effectiveSettings, pl)
pl
})
pipeline(tcpFlow, remoteAddress)
}
def connect(host: String, port: Int = 80,
localAddress: Option[InetSocketAddress] = None,
options: immutable.Traversable[Inet.SocketOption] = Nil,
settings: Option[ClientConnectionSettings] = None): Http.OutgoingFlow =
connect(new InetSocketAddress(host, port), localAddress, options, settings)
def bind(endpoint: InetSocketAddress,
backlog: Int,
options: immutable.Traversable[Inet.SocketOption],
serverSettings: Option[ServerSettings]): Http.ServerSource = {
import system.dispatcher
// FIXME IdleTimeout?
val src = StreamTcp(system).bind(endpoint, backlog, options)
val key = new Key {
override type MaterializedType = Future[Http.ServerBinding]
override def materialize(map: MaterializedMap) = map.get(src).map(s Http.ServerBinding(s.localAddress, s))
}
val log = system.log
val effectiveSettings = ServerSettings(serverSettings)(system)
Http.ServerSource(src.withKey(key).map(new HttpServerPipeline(effectiveSettings, log)), key)
}
def bind(interface: String, port: Int = 80, backlog: Int = 100,
options: immutable.Traversable[Inet.SocketOption] = Nil,
serverSettings: Option[ServerSettings] = None): Http.ServerSource =
bind(new InetSocketAddress(interface, port), backlog, options, serverSettings)
}
def createExtension(system: ExtendedActorSystem): HttpExt =
new HttpExt(system.settings.config getConfig "akka.http")(system)
}

View file

@ -0,0 +1,74 @@
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.http.engine.client
import java.net.InetSocketAddress
import scala.collection.immutable.Queue
import akka.util.ByteString
import akka.event.LoggingAdapter
import akka.stream.FlattenStrategy
import akka.stream.scaladsl._
import akka.http.model.{ HttpMethod, HttpRequest, HttpResponse }
import akka.http.engine.rendering.{ RequestRenderingContext, HttpRequestRendererFactory }
import akka.http.engine.parsing.{ HttpHeaderParser, HttpResponseParser }
import akka.http.engine.parsing.ParserOutput._
import akka.http.util._
/**
* INTERNAL API
*/
private[http] object HttpClient {
def transportToConnectionClientFlow(transport: Flow[ByteString, ByteString],
remoteAddress: InetSocketAddress,
settings: ClientConnectionSettings,
log: LoggingAdapter): Flow[HttpRequest, HttpResponse] = {
import settings._
// the initial header parser we initially use for every connection,
// will not be mutated, all "shared copy" parsers copy on first-write into the header cache
val rootParser = new HttpResponseParser(
parserSettings,
HttpHeaderParser(parserSettings) { errorInfo
if (parserSettings.illegalHeaderWarnings) log.warning(errorInfo.withSummaryPrepended("Illegal response header").formatPretty)
})
val requestRendererFactory = new HttpRequestRendererFactory(userAgentHeader, requestHeaderSizeHint, log)
val requestMethodByPass = new RequestMethodByPass(remoteAddress)
Flow[HttpRequest]
.map(requestMethodByPass)
.transform("renderer", () requestRendererFactory.newRenderer)
.flatten(FlattenStrategy.concat)
.transform("errorLogger", () errorLogger(log, "Outgoing request stream error"))
.via(transport)
.transform("rootParser", ()
// each connection uses a single (private) response parser instance for all its responses
// which builds a cache of all header instances seen on that connection
rootParser.createShallowCopy(requestMethodByPass))
.splitWhen(_.isInstanceOf[MessageStart])
.headAndTail
.collect {
case (ResponseStart(statusCode, protocol, headers, createEntity, _), entityParts)
HttpResponse(statusCode, headers, createEntity(entityParts), protocol)
}
}
// FIXME: refactor to a pure-stream design that allows us to get rid of this ad-hoc queue here
class RequestMethodByPass(serverAddress: InetSocketAddress)
extends (HttpRequest RequestRenderingContext) with (() HttpMethod) {
private[this] var requestMethods = Queue.empty[HttpMethod]
def apply(request: HttpRequest) = {
requestMethods = requestMethods.enqueue(request.method)
RequestRenderingContext(request, serverAddress)
}
def apply(): HttpMethod =
if (requestMethods.nonEmpty) {
val method = requestMethods.head
requestMethods = requestMethods.tail
method
} else HttpResponseParser.NoMethod
}
}

View file

@ -1,111 +0,0 @@
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.http.engine.client
import java.net.InetSocketAddress
import scala.collection.immutable.Queue
import akka.stream.scaladsl._
import akka.event.LoggingAdapter
import akka.stream.FlattenStrategy
import akka.stream.io.StreamTcp
import akka.util.ByteString
import akka.http.Http
import akka.http.model.{ HttpMethod, HttpRequest, ErrorInfo, HttpResponse }
import akka.http.engine.rendering.{ RequestRenderingContext, HttpRequestRendererFactory }
import akka.http.engine.parsing.{ HttpRequestParser, HttpHeaderParser, HttpResponseParser }
import akka.http.engine.parsing.ParserOutput._
import akka.http.util._
import scala.concurrent.{ ExecutionContext, Future }
/**
* INTERNAL API
*/
private[http] class HttpClientPipeline(effectiveSettings: ClientConnectionSettings,
log: LoggingAdapter)(implicit ec: ExecutionContext)
extends ((StreamTcp.OutgoingTcpFlow, InetSocketAddress) Http.OutgoingFlow) {
import effectiveSettings._
// the initial header parser we initially use for every connection,
// will not be mutated, all "shared copy" parsers copy on first-write into the header cache
val rootParser = new HttpResponseParser(
parserSettings,
HttpHeaderParser(parserSettings) { errorInfo
if (parserSettings.illegalHeaderWarnings) log.warning(errorInfo.withSummaryPrepended("Illegal response header").formatPretty)
})
val requestRendererFactory = new HttpRequestRendererFactory(userAgentHeader, requestHeaderSizeHint, log)
def apply(tcpFlow: StreamTcp.OutgoingTcpFlow, remoteAddress: InetSocketAddress): Http.OutgoingFlow = {
import FlowGraphImplicits._
val httpKey = new HttpKey(tcpFlow.key)
val flowWithHttpKey = tcpFlow.flow.withKey(httpKey)
val requestMethodByPass = new RequestMethodByPass(remoteAddress)
val pipeline = Flow() { implicit b
val userIn = UndefinedSource[(HttpRequest, Any)]
val userOut = UndefinedSink[(HttpResponse, Any)]
val bypassFanout = Unzip[HttpRequest, Any]("bypassFanout")
val bypassFanin = Zip[HttpResponse, Any]("bypassFanin")
val requestPipeline =
Flow[HttpRequest]
.map(requestMethodByPass)
.transform("renderer", () requestRendererFactory.newRenderer)
.flatten(FlattenStrategy.concat)
.transform("errorLogger", () errorLogger(log, "Outgoing request stream error"))
val responsePipeline =
Flow[ByteString]
.transform("rootParser", ()
// each connection uses a single (private) response parser instance for all its responses
// which builds a cache of all header instances seen on that connection
rootParser.createShallowCopy(requestMethodByPass))
.splitWhen(_.isInstanceOf[MessageStart])
.headAndTail
.collect {
case (ResponseStart(statusCode, protocol, headers, createEntity, _), entityParts)
HttpResponse(statusCode, headers, createEntity(entityParts), protocol)
}
//FIXME: the graph is unnecessary after fixing #15957
userIn ~> bypassFanout.in
bypassFanout.left ~> requestPipeline ~> flowWithHttpKey ~> responsePipeline ~> bypassFanin.left
bypassFanout.right ~> bypassFanin.right
bypassFanin.out ~> userOut
userIn -> userOut
}
Http.OutgoingFlow(pipeline, httpKey)
}
class RequestMethodByPass(serverAddress: InetSocketAddress)
extends ((HttpRequest) RequestRenderingContext) with (() HttpMethod) {
private[this] var requestMethods = Queue.empty[HttpMethod]
def apply(request: HttpRequest) = {
requestMethods = requestMethods.enqueue(request.method)
RequestRenderingContext(request, serverAddress)
}
def apply(): HttpMethod =
if (requestMethods.nonEmpty) {
val method = requestMethods.head
requestMethods = requestMethods.tail
method
} else HttpResponseParser.NoMethod
}
class HttpKey(tcpKey: Key { type MaterializedType = Future[StreamTcp.OutgoingTcpConnection] }) extends Key {
type MaterializedType = Future[Http.OutgoingConnection]
override def materialize(map: MaterializedMap): MaterializedType =
map.get(tcpKey).map(tcp Http.OutgoingConnection(tcp.remoteAddress, tcp.localAddress))
}
}

View file

@ -1,28 +0,0 @@
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.http.engine.client
import akka.http.model.{ HttpResponse, HttpRequest }
import org.reactivestreams.{ Subscription, Publisher, Subscriber, Processor }
/**
* A `HttpClientProcessor` models an HTTP client as a stream processor that provides
* responses for requests with an attached context object of a custom type,
* which is funneled through and completely transparent to the processor itself.
*/
trait HttpClientProcessor[T] extends Processor[(HttpRequest, T), (HttpResponse, T)]
object HttpClientProcessor {
def apply[T](requestSubscriber: Subscriber[(HttpRequest, T)],
responsePublisher: Publisher[(HttpResponse, T)]): HttpClientProcessor[T] =
new HttpClientProcessor[T] {
override def subscribe(s: Subscriber[_ >: (HttpResponse, T)]): Unit = responsePublisher.subscribe(s)
override def onError(t: Throwable): Unit = requestSubscriber.onError(t)
override def onSubscribe(s: Subscription): Unit = requestSubscriber.onSubscribe(s)
override def onComplete(): Unit = requestSubscriber.onComplete()
override def onNext(t: (HttpRequest, T)): Unit = requestSubscriber.onNext(t)
}
}

View file

@ -4,68 +4,40 @@
package akka.http.engine.server
import akka.actor.{ Props, ActorRef }
import akka.actor.{ ActorRef, Props }
import akka.util.ByteString
import akka.event.LoggingAdapter
import akka.stream.stage.PushPullStage
import akka.util.ByteString
import akka.stream.io.StreamTcp
import akka.stream.FlattenStrategy
import akka.stream.FlowMaterializer
import akka.stream.scaladsl._
import akka.http.engine.parsing.{ HttpHeaderParser, HttpRequestParser }
import akka.http.engine.rendering.{ ResponseRenderingContext, HttpResponseRendererFactory }
import akka.http.model._
import akka.http.engine.parsing.ParserOutput._
import akka.http.Http
import akka.http.model._
import akka.http.util._
import akka.http.Http
import scala.util.control.NonFatal
/**
* INTERNAL API
*/
private[http] class HttpServerPipeline(settings: ServerSettings, log: LoggingAdapter)
extends (StreamTcp.IncomingTcpConnection Http.IncomingConnection) {
import settings.parserSettings
private[http] object HttpServer {
// the initial header parser we initially use for every connection,
// will not be mutated, all "shared copy" parsers copy on first-write into the header cache
val rootParser = new HttpRequestParser(
parserSettings,
settings.rawRequestUriHeader,
HttpHeaderParser(parserSettings) { errorInfo
if (parserSettings.illegalHeaderWarnings) log.warning(errorInfo.withSummaryPrepended("Illegal request header").formatPretty)
})
def serverFlowToTransport(serverFlow: Flow[HttpRequest, HttpResponse],
settings: ServerSettings,
log: LoggingAdapter): Flow[ByteString, ByteString] = {
val responseRendererFactory = new HttpResponseRendererFactory(settings.serverHeader, settings.responseHeaderSizeHint, log)
// the initial header parser we initially use for every connection,
// will not be mutated, all "shared copy" parsers copy on first-write into the header cache
val rootParser = new HttpRequestParser(
settings.parserSettings,
settings.rawRequestUriHeader,
HttpHeaderParser(settings.parserSettings) { errorInfo
if (settings.parserSettings.illegalHeaderWarnings) log.warning(errorInfo.withSummaryPrepended("Illegal request header").formatPretty)
})
val bypassFanout = Broadcast[RequestOutput]("bypassFanout")
val bypassMerge = new BypassMerge
val requestPreparation =
Flow[RequestOutput]
.splitWhen(x x.isInstanceOf[MessageStart] || x == MessageEnd)
.headAndTail
.collect {
case (RequestStart(method, uri, protocol, headers, createEntity, _, _), entityParts)
val effectiveUri = HttpRequest.effectiveUri(uri, headers, securedConnection = false, settings.defaultHostHeader)
val effectiveMethod = if (method == HttpMethods.HEAD && settings.transparentHeadRequests) HttpMethods.GET else method
HttpRequest(effectiveMethod, effectiveUri, headers, createEntity(entityParts), protocol)
}
val rendererPipeline =
Flow[ResponseRenderingContext]
.transform("recover", () new ErrorsTo500ResponseRecovery(log)) // FIXME: simplify after #16394 is closed
.transform("renderer", () responseRendererFactory.newRenderer)
.flatten(FlattenStrategy.concat)
.transform("errorLogger", () errorLogger(log, "Outgoing response stream error"))
def apply(tcpConn: StreamTcp.IncomingTcpConnection): Http.IncomingConnection = {
import FlowGraphImplicits._
val userIn = UndefinedSink[HttpRequest]
val userOut = UndefinedSource[HttpResponse]
val responseRendererFactory = new HttpResponseRendererFactory(settings.serverHeader, settings.responseHeaderSizeHint, log)
@volatile var oneHundredContinueRef: Option[ActorRef] = None // FIXME: unnecessary after fixing #16168
val oneHundredContinueSource = Source[OneHundredContinue.type] {
@ -76,28 +48,50 @@ private[http] class HttpServerPipeline(settings: ServerSettings, log: LoggingAda
}
}
// FIXME The whole pipeline can maybe be created up front when #16168 is fixed
val pipeline = Flow() { implicit b
val bypassFanout = Broadcast[RequestOutput]("bypassFanout")
val bypassMerge = new BypassMerge(settings, log)
val requestParsing = Flow[ByteString].transform("rootParser", ()
// each connection uses a single (private) request parser instance for all its requests
// which builds a cache of all header instances seen on that connection
rootParser.createShallowCopy(() oneHundredContinueRef))
val requestParsing = Flow[ByteString].transform("rootParser", ()
// each connection uses a single (private) request parser instance for all its requests
// which builds a cache of all header instances seen on that connection
rootParser.createShallowCopy(() oneHundredContinueRef))
val requestPreparation =
Flow[RequestOutput]
.splitWhen(x x.isInstanceOf[MessageStart] || x == MessageEnd)
.headAndTail
.collect {
case (RequestStart(method, uri, protocol, headers, createEntity, _, _), entityParts)
val effectiveUri = HttpRequest.effectiveUri(uri, headers, securedConnection = false, settings.defaultHostHeader)
val effectiveMethod = if (method == HttpMethods.HEAD && settings.transparentHeadRequests) HttpMethods.GET else method
HttpRequest(effectiveMethod, effectiveUri, headers, createEntity(entityParts), protocol)
}
val rendererPipeline =
Flow[ResponseRenderingContext]
.transform("recover", () new ErrorsTo500ResponseRecovery(log)) // FIXME: simplify after #16394 is closed
.transform("renderer", () responseRendererFactory.newRenderer)
.flatten(FlattenStrategy.concat)
.transform("errorLogger", () errorLogger(log, "Outgoing response stream error"))
val transportIn = UndefinedSource[ByteString]
val transportOut = UndefinedSink[ByteString]
import FlowGraphImplicits._
Flow() { implicit b
//FIXME: the graph is unnecessary after fixing #15957
userOut ~> bypassMerge.applicationInput ~> rendererPipeline ~> tcpConn.stream ~> requestParsing ~> bypassFanout ~> requestPreparation ~> userIn
transportIn ~> requestParsing ~> bypassFanout ~> requestPreparation ~> serverFlow ~> bypassMerge.applicationInput ~> rendererPipeline ~> transportOut
bypassFanout ~> bypassMerge.bypassInput
oneHundredContinueSource ~> bypassMerge.oneHundredContinueInput
b.allowCycles()
userOut -> userIn
transportIn -> transportOut
}
Http.IncomingConnection(tcpConn.remoteAddress, pipeline)
}
class BypassMerge extends FlexiMerge[ResponseRenderingContext]("BypassMerge") {
class BypassMerge(settings: ServerSettings, log: LoggingAdapter) extends FlexiMerge[ResponseRenderingContext]("BypassMerge") {
import FlexiMerge._
val bypassInput = createInputPort[RequestOutput]()
val oneHundredContinueInput = createInputPort[OneHundredContinue.type]()

View file

@ -12,6 +12,7 @@ import scala.concurrent.Await
import scala.concurrent.duration._
import org.scalatest.{ BeforeAndAfterAll, Matchers, WordSpec }
import akka.actor.ActorSystem
import akka.stream.io.StreamTcp
import akka.stream.FlowMaterializer
import akka.stream.testkit.StreamTestKit
import akka.stream.testkit.StreamTestKit.{ PublisherProbe, SubscriberProbe }
@ -36,45 +37,39 @@ class ClientServerSpec extends WordSpec with Matchers with BeforeAndAfterAll {
"The server-side HTTP infrastructure" should {
"properly bind and unbind a server" in {
"properly bind a server" in {
val (hostname, port) = temporaryServerHostnameAndPort()
val Http.ServerSource(source, key) = Http(system).bind(hostname, port)
val c = StreamTestKit.SubscriberProbe[Http.IncomingConnection]()
val mm = source.to(Sink(c)).run()
val Http.ServerBinding(localAddress) = Await.result(mm.get(key), 3.seconds)
val sub = c.expectSubscription()
localAddress.getHostName shouldEqual hostname
localAddress.getPort shouldEqual port
val binding = Http().bind(hostname, port)
val probe = StreamTestKit.SubscriberProbe[Http.IncomingConnection]()
binding.connections.runWith(Sink(probe))
val sub = probe.expectSubscription() // if we get it we are bound
sub.cancel()
// TODO: verify unbinding effect
}
"report failure if bind fails" in {
"report failure if bind fails" in pendingUntilFixed { // FIXME: "unpend"!
val (hostname, port) = temporaryServerHostnameAndPort()
val Http.ServerSource(source, key) = Http(system).bind(hostname, port)
val c1 = StreamTestKit.SubscriberProbe[Http.IncomingConnection]()
val mm1 = source.to(Sink(c1)).run()
val sub = c1.expectSubscription()
val Http.ServerBinding(localAddress) = Await.result(mm1.get(key), 3.seconds)
localAddress.getHostName shouldEqual hostname
localAddress.getPort shouldEqual port
val c2 = StreamTestKit.SubscriberProbe[Http.IncomingConnection]()
val mm2 = source.to(Sink(c2)).run()
val failure = intercept[akka.stream.io.StreamTcp.IncomingTcpException] {
val serverBinding = Await.result(mm2.get(key), 3.seconds)
}
failure.getMessage should be("Bind failed")
sub.cancel()
val binding = Http().bind(hostname, port)
val probe1 = StreamTestKit.SubscriberProbe[Http.IncomingConnection]()
val mm1 = binding.connections.to(Sink(probe1)).run()
probe1.expectSubscription()
val probe2 = StreamTestKit.SubscriberProbe[Http.IncomingConnection]()
binding.connections.runWith(Sink(probe2))
probe2.expectError(StreamTcp.BindFailedException)
Await.result(binding.unbind(mm1), 1.second)
val probe3 = StreamTestKit.SubscriberProbe[Http.IncomingConnection]()
val mm3 = binding.connections.to(Sink(probe3)).run()
probe3.expectSubscription() // we bound a second time, which means the previous unbind was successful
Await.result(binding.unbind(mm3), 1.second)
}
"properly complete a simple request/response cycle" in new TestSetup {
val (clientOut, clientIn) = openNewClientConnection[Symbol]()
val (clientOut, clientIn) = openNewClientConnection()
val (serverIn, serverOut) = acceptConnection()
val clientOutSub = clientOut.expectSubscription()
clientOutSub.sendNext(HttpRequest(uri = "/abc") -> 'abcContext)
clientOutSub.sendNext(HttpRequest(uri = "/abc"))
val serverInSub = serverIn.expectSubscription()
serverInSub.request(1)
@ -85,12 +80,12 @@ class ClientServerSpec extends WordSpec with Matchers with BeforeAndAfterAll {
val clientInSub = clientIn.expectSubscription()
clientInSub.request(1)
val (response, 'abcContext) = clientIn.expectNext()
val response = clientIn.expectNext()
toStrict(response.entity) shouldEqual HttpEntity("yeah")
}
"properly complete a chunked request/response cycle" in new TestSetup {
val (clientOut, clientIn) = openNewClientConnection[Long]()
val (clientOut, clientIn) = openNewClientConnection()
val (serverIn, serverOut) = acceptConnection()
val chunks = List(Chunk("abc"), Chunk("defg"), Chunk("hijkl"), LastChunk)
@ -98,7 +93,7 @@ class ClientServerSpec extends WordSpec with Matchers with BeforeAndAfterAll {
val chunkedEntity = HttpEntity.Chunked(chunkedContentType, Source(chunks))
val clientOutSub = clientOut.expectSubscription()
clientOutSub.sendNext(HttpRequest(POST, "/chunked", List(Accept(MediaRanges.`*/*`)), chunkedEntity) -> 12345678)
clientOutSub.sendNext(HttpRequest(POST, "/chunked", List(Accept(MediaRanges.`*/*`)), chunkedEntity))
val serverInSub = serverIn.expectSubscription()
serverInSub.request(1)
@ -112,8 +107,8 @@ class ClientServerSpec extends WordSpec with Matchers with BeforeAndAfterAll {
val clientInSub = clientIn.expectSubscription()
clientInSub.request(1)
val (HttpResponse(StatusCodes.PartialContent, List(Date(_), Server(_), RawHeader("Age", "42")),
Chunked(`chunkedContentType`, chunkStream2), HttpProtocols.`HTTP/1.1`), 12345678) = clientIn.expectNext()
val HttpResponse(StatusCodes.PartialContent, List(Date(_), Server(_), RawHeader("Age", "42")),
Chunked(`chunkedContentType`, chunkStream2), HttpProtocols.`HTTP/1.1`) = clientIn.expectNext()
Await.result(chunkStream2.grouped(1000).runWith(Sink.head), 100.millis) shouldEqual chunks
}
@ -126,35 +121,35 @@ class ClientServerSpec extends WordSpec with Matchers with BeforeAndAfterAll {
def configOverrides = ""
// automatically bind a server
val connectionStream: SubscriberProbe[Http.IncomingConnection] = {
val connSource = {
val settings = configOverrides.toOption.map(ServerSettings.apply)
val Http.ServerSource(source, key) = Http(system).bind(hostname, port, serverSettings = settings)
val binding = Http().bind(hostname, port, settings = settings)
val probe = StreamTestKit.SubscriberProbe[Http.IncomingConnection]
source.to(Sink(probe)).run()
binding.connections.runWith(Sink(probe))
probe
}
val connectionStreamSub = connectionStream.expectSubscription()
val connSourceSub = connSource.expectSubscription()
def openNewClientConnection[T](settings: Option[ClientConnectionSettings] = None): (PublisherProbe[(HttpRequest, T)], SubscriberProbe[(HttpResponse, T)]) = {
val outgoingFlow = Http(system).connect(hostname, port, settings = settings)
val requestPublisherProbe = StreamTestKit.PublisherProbe[(HttpRequest, T)]()
val responseSubscriberProbe = StreamTestKit.SubscriberProbe[(HttpResponse, T)]()
val tflow = outgoingFlow.flow.asInstanceOf[Flow[((HttpRequest, T)), ((HttpResponse, T))]]
val mm = Flow(Sink(responseSubscriberProbe), Source(requestPublisherProbe)).join(tflow).run()
val connection = Await.result(mm.get(outgoingFlow.key), 3.seconds)
connection.remoteAddress.getPort shouldEqual port
def openNewClientConnection(settings: Option[ClientConnectionSettings] = None): (PublisherProbe[HttpRequest], SubscriberProbe[HttpResponse]) = {
val requestPublisherProbe = StreamTestKit.PublisherProbe[HttpRequest]()
val responseSubscriberProbe = StreamTestKit.SubscriberProbe[HttpResponse]()
val connection = Http().outgoingConnection(hostname, port, settings = settings)
connection.remoteAddress.getHostName shouldEqual hostname
connection.remoteAddress.getPort shouldEqual port
Source(requestPublisherProbe).via(connection.flow).runWith(Sink(responseSubscriberProbe))
requestPublisherProbe -> responseSubscriberProbe
}
def acceptConnection(): (SubscriberProbe[HttpRequest], PublisherProbe[HttpResponse]) = {
connectionStreamSub.request(1)
val Http.IncomingConnection(remoteAddress, flow) = connectionStream.expectNext()
connSourceSub.request(1)
val incomingConnection = connSource.expectNext()
val sink = PublisherSink[HttpRequest]()
val source = SubscriberSource[HttpResponse]()
val mm = incomingConnection.handleWith(Flow(sink, source))
val requestSubscriberProbe = StreamTestKit.SubscriberProbe[HttpRequest]()
val responsePublisherProbe = StreamTestKit.PublisherProbe[HttpResponse]()
Flow(Sink(requestSubscriberProbe), Source(responsePublisherProbe)).join(flow).run()
mm.get(sink).subscribe(requestSubscriberProbe)
responsePublisherProbe.subscribe(mm.get(source))
requestSubscriberProbe -> responsePublisherProbe
}

View file

@ -17,15 +17,15 @@ object TestClient extends App {
akka.log-dead-letters = off
""")
implicit val system = ActorSystem("ServerTest", testConf)
import akka.http.TestClient.system.dispatcher
implicit val fm = FlowMaterializer()
import system.dispatcher
implicit val materializer = FlowMaterializer()
val host = "spray.io"
println(s"Fetching HTTP server version of host `$host` ...")
val outgoingFlow = Http(system).connect(host)
val result = Source.singleton(HttpRequest() -> 'NoContext).via(outgoingFlow.flow).map(_._1).runWith(Sink.head)
val connection = Http().outgoingConnection(host)
val result = Source.singleton(HttpRequest()).via(connection.flow).runWith(Sink.head)
result.map(_.header[headers.Server]) onComplete {
case Success(res) println(s"$host is running ${res mkString ", "}")

View file

@ -7,11 +7,9 @@ package akka.http
import akka.actor.ActorSystem
import akka.http.model._
import akka.stream.FlowMaterializer
import akka.stream.scaladsl.{ Flow, Sink }
import akka.stream.scaladsl.Flow
import com.typesafe.config.{ ConfigFactory, Config }
import HttpMethods._
import scala.concurrent.Await
import scala.concurrent.duration._
object TestServer extends App {
val testConf: Config = ConfigFactory.parseString("""
@ -19,30 +17,26 @@ object TestServer extends App {
akka.log-dead-letters = off
""")
implicit val system = ActorSystem("ServerTest", testConf)
implicit val fm = FlowMaterializer()
val requestHandler: HttpRequest HttpResponse = {
case HttpRequest(GET, Uri.Path("/"), _, _, _) index
case HttpRequest(GET, Uri.Path("/ping"), _, _, _) HttpResponse(entity = "PONG!")
case HttpRequest(GET, Uri.Path("/crash"), _, _, _) sys.error("BOOM!")
case _: HttpRequest HttpResponse(404, entity = "Unknown resource!")
val binding = Http().bind(interface = "localhost", port = 8080)
for (connection binding.connections) {
println("Accepted new connection from " + connection.remoteAddress)
connection handleWith {
Flow[HttpRequest] map {
case HttpRequest(GET, Uri.Path("/"), _, _, _) index
case HttpRequest(GET, Uri.Path("/ping"), _, _, _) HttpResponse(entity = "PONG!")
case HttpRequest(GET, Uri.Path("/crash"), _, _, _) sys.error("BOOM!")
case _: HttpRequest HttpResponse(404, entity = "Unknown resource!")
}
}
}
implicit val materializer = FlowMaterializer()
val Http.ServerSource(source, key) = Http(system).bind(interface = "localhost", port = 8080)
val materializedMap = source.to(Sink.foreach {
case Http.IncomingConnection(remoteAddress, flow)
println("Accepted new connection from " + remoteAddress)
flow.join(Flow[HttpRequest].map(requestHandler)).run()
}).run()
val serverBinding = Await.result(materializedMap.get(key), 3 seconds)
println(s"Server online at http://${serverBinding.localAddress.getHostName}:${serverBinding.localAddress.getPort}")
println(s"Server online at http://localhost:8080")
println("Press RETURN to stop...")
Console.readLine()
serverBinding.close()
system.shutdown()
////////////// helpers //////////////

View file

@ -10,9 +10,7 @@ import akka.event.NoLogging
import akka.util.ByteString
import akka.stream.scaladsl._
import akka.stream.FlowMaterializer
import akka.stream.io.StreamTcp
import akka.stream.testkit.{ AkkaSpec, StreamTestKit }
import akka.http.Http
import akka.http.model._
import akka.http.util._
import headers._
@ -20,7 +18,7 @@ import HttpEntity._
import MediaTypes._
import HttpMethods._
class HttpServerPipelineSpec extends AkkaSpec with Matchers with BeforeAndAfterAll with Inside {
class HttpServerSpec extends AkkaSpec with Matchers with BeforeAndAfterAll with Inside {
implicit val materializer = FlowMaterializer()
"The server implementation" should {
@ -612,14 +610,18 @@ class HttpServerPipelineSpec extends AkkaSpec with Matchers with BeforeAndAfterA
}
class TestSetup {
val netIn = StreamTestKit.PublisherProbe[ByteString]
val netOut = StreamTestKit.SubscriberProbe[ByteString]
val tcpConnection = StreamTcp.IncomingTcpConnection(null, Flow(Sink(netOut), Source(netIn)))
val requests = StreamTestKit.SubscriberProbe[HttpRequest]
val responses = StreamTestKit.PublisherProbe[HttpResponse]
def settings = ServerSettings(system).copy(serverHeader = Some(Server(List(ProductVersion("akka-http", "test")))))
val pipeline = new HttpServerPipeline(settings, NoLogging)
val Http.IncomingConnection(_, httpPipelineFlow) = pipeline(tcpConnection)
val (netIn, netOut) = {
val netIn = StreamTestKit.PublisherProbe[ByteString]
val netOut = StreamTestKit.SubscriberProbe[ByteString]
val transportFlow = HttpServer.serverFlowToTransport(Flow(Sink(requests), Source(responses)), settings, NoLogging)
Source(netIn).via(transportFlow).runWith(Sink(netOut))
netIn -> netOut
}
def wipeDate(string: String) =
string.fastSplit('\n').map {
@ -627,10 +629,6 @@ class HttpServerPipelineSpec extends AkkaSpec with Matchers with BeforeAndAfterA
case s s
}.mkString("\n")
val requests = StreamTestKit.SubscriberProbe[HttpRequest]
val responses = StreamTestKit.PublisherProbe[HttpResponse]
Flow(Sink(requests), Source(responses)).join(httpPipelineFlow).run()
val netInSub = netIn.expectSubscription()
val netOutSub = netOut.expectSubscription()
val requestsSub = requests.expectSubscription()
@ -647,4 +645,4 @@ class HttpServerPipelineSpec extends AkkaSpec with Matchers with BeforeAndAfterA
def closeNetworkInput(): Unit = netInSub.sendComplete()
}
}
}