!hco Add client-side request rendering and response parsing, refactor, strip-down

Remove everything that is out of scope for akka-http-core M1.
Things like the host- and request-level client-side API will be added back later.
This commit is contained in:
Mathias 2014-06-13 15:31:57 +02:00
parent 8b81738d24
commit db13edbd55
37 changed files with 1391 additions and 1128 deletions

View file

@ -8,72 +8,20 @@
akka.http {
server {
# The value of the `Server` header to produce.
# Set to the empty string to disable rendering of the server header.
# The default value of the `Server` header to produce if no
# explicit `Server`-header was included in a response.
# If this value is the empty string and no header was included in
# the request, no `Server` header will be rendered at all.
server-header = akka-http/${akka.version}
# Enables/disables SSL encryption.
# If enabled the server uses the implicit `ServerSSLEngineProvider` member
# of the `Bind` command to create `SSLEngine` instances for the underlying
# IO connection.
ssl-encryption = off
# The maximum number of requests that are accepted (and dispatched to
# the application) on one single connection before the first request
# has to be completed.
# Incoming requests that would cause the pipelining limit to be exceeded
# are not read from the connections socket so as to build up "back-pressure"
# to the client via TCP flow control.
# A setting of 1 disables HTTP pipelining, since only one request per
# connection can be "open" (i.e. being processed by the application) at any
# time. Set to higher values to enable HTTP pipelining.
# Set to 'disabled' for completely disabling pipelining limits
# (not recommended on public-facing servers due to risk of DoS attacks).
# This value must be > 0 and <= 128.
pipelining-limit = 1
# The time after which an idle connection will be automatically closed.
# Set to `infinite` to completely disable idle connection timeouts.
idle-timeout = 60 s
# If a request hasn't been responded to after the time period set here
# a `akka.http.Timedout` message will be sent to the timeout handler.
# Set to `infinite` to completely disable request timeouts.
request-timeout = 20 s
# After a `Timedout` message has been sent to the timeout handler and the
# request still hasn't been completed after the time period set here
# the server will complete the request itself with an error response.
# Set to `infinite` to disable timeout timeouts.
timeout-timeout = 2 s
# The path of the actor to send `akka.http.Timedout` messages to.
# If empty all `Timedout` messages will go to the "regular" request
# handling actor.
timeout-handler = ""
# The time period within which the TCP binding process must be completed.
# Set to `infinite` to disable.
bind-timeout = 1s
# The time period within which the TCP unbinding process must be completed.
# Set to `infinite` to disable.
unbind-timeout = 1s
# The time after which a connection is aborted (RST) after a parsing error
# occurred. The timeout prevents a connection which is already known to be
# erroneous from receiving evermore data even if all of the data will be ignored.
# However, in case of a connection abortion the client usually doesn't properly
# receive the error response. This timeout is a trade-off which allows the client
# some time to finish its request and receive a proper error response before the
# connection is forcibly closed to free resources.
parse-error-abort-timeout = 2s
# The "granularity" of timeout checking for both idle connections timeouts
# as well as request timeouts, should rarely be needed to modify.
# If set to `infinite` request and connection timeout checking is disabled.
reaping-cycle = 250 ms
# Enables/disables the addition of a `Remote-Address` header
# holding the clients (remote) IP address.
remote-address-header = off
@ -103,12 +51,6 @@ akka.http {
# doesn't have to be fiddled with in most applications.
response-header-size-hint = 512
# For HTTPS connections this setting specifies the maximum number of
# bytes that are encrypted in one go. Large responses are broken down in
# chunks of this size so as to already begin sending before the response has
# been encrypted entirely.
max-encryption-chunk-size = 1m
# If this setting is empty the server only accepts requests that carry a
# non-empty `Host` header. Otherwise it responds with `400 Bad Request`.
# Set to a non-empty value to be used in lieu of a missing or empty `Host`
@ -131,45 +73,17 @@ akka.http {
user-agent-header = akka-http/${akka.version}
# The time period within which the TCP connecting process must be completed.
# Set to `infinite` to disable.
connecting-timeout = 10s
# The time after which an idle connection will be automatically closed.
# Set to `infinite` to completely disable idle timeouts.
idle-timeout = 60 s
# The max time period that a client connection will be waiting for a response
# before triggering a request timeout. The timer for this logic is not started
# until the connection is actually in a state to receive the response, which
# may be quite some time after the request has been received from the
# application!
# There are two main reasons to delay the start of the request timeout timer:
# 1. On the host-level API with pipelining disabled:
# If the request cannot be sent immediately because all connections are
# currently busy with earlier requests it has to be queued until a
# connection becomes available.
# 2. With pipelining enabled:
# The request timeout timer starts only once the response for the
# preceding request on the connection has arrived.
# Set to `infinite` to completely disable request timeouts.
request-timeout = 20 s
# the "granularity" of timeout checking for both idle connections timeouts
# as well as request timeouts, should rarely be needed to modify.
# If set to `infinite` request and connection timeout checking is disabled.
reaping-cycle = 250 ms
# The initial size of the buffer to render the request headers in.
# Can be used for fine-tuning request rendering performance but probably
# doesn't have to be fiddled with in most applications.
request-header-size-hint = 512
# For HTTPS connections this setting specified the maximum number of
# bytes that are encrypted in one go. Large requests are broken down in
# chunks of this size so as to already begin sending before the request has
# been encrypted entirely.
max-encryption-chunk-size = 1m
# The proxy configurations to be used for requests with the specified
# scheme.
proxy {
@ -193,37 +107,6 @@ akka.http {
parsing = ${akka.http.parsing}
}
host-connector {
# The maximum number of parallel connections that an `HttpHostConnector`
# is allowed to establish to a host. Must be > 0.
max-connections = 4
# The maximum number of times an `HttpHostConnector` attempts to repeat
# failed requests (if the request can be safely retried) before
# giving up and returning an error.
max-retries = 5
# Configures redirection following.
# If set to zero redirection responses will not be followed, i.e. they'll be returned to the user as is.
# If set to a value > zero redirection responses will be followed up to the given number of times.
# If the redirection chain is longer than the configured value the first redirection response that is
# is not followed anymore is returned to the user as is.
max-redirects = 0
# If this setting is enabled, the `HttpHostConnector` pipelines requests
# across connections, otherwise only one single request can be "open"
# on a particular HTTP connection.
pipelining = off
# The time after which an idle `HttpHostConnector` (without open
# connections) will automatically terminate itself.
# Set to `infinite` to completely disable idle timeouts.
idle-timeout = 30 s
# Modify to tweak client settings for this host-connector only.
client = ${akka.http.client}
}
# The (default) configuration of the HTTP message parser for the server and the client.
# IMPORTANT: These settings (i.e. children of `akka.http.parsing`) can't be directly
# overridden in `application.conf` to change the parser settings for client and server
@ -265,11 +148,6 @@ akka.http {
# provided as a `RawHeader`.
illegal-header-warnings = on
# Enables/disables inclusion of an SSL-Session-Info header in parsed
# messages over SSL transports (i.e., HttpRequest on server side and
# HttpResponse on client side).
ssl-session-info-header = off
# limits for the number of different values per header type that the
# header cache will hold
header-cache {
@ -288,20 +166,4 @@ akka.http {
# Fully qualified config path which holds the dispatcher configuration
# to be used for the HttpManager.
manager-dispatcher = "akka.actor.default-dispatcher"
# Fully qualified config path which holds the dispatcher configuration
# to be used for the HttpClientSettingsGroup actors.
settings-group-dispatcher = "akka.actor.default-dispatcher"
# Fully qualified config path which holds the dispatcher configuration
# to be used for the HttpHostConnector actors.
host-connector-dispatcher = "akka.actor.default-dispatcher"
# Fully qualified config path which holds the dispatcher configuration
# to be used for HttpListener actors.
listener-dispatcher = "akka.actor.default-dispatcher"
# Fully qualified config path which holds the dispatcher configuration
# to be used for HttpServerConnection and HttpClientConnection actors.
connection-dispatcher = "akka.actor.default-dispatcher"
}
}

View file

@ -7,13 +7,12 @@ package akka.http
import java.net.InetSocketAddress
import com.typesafe.config.Config
import org.reactivestreams.api.{ Producer, Consumer }
import scala.concurrent.duration.Duration
import scala.collection.immutable
import akka.io.{ Inet, Tcp }
import akka.io.Inet
import akka.stream.MaterializerSettings
import akka.http.client.{ HostConnectorSettings, ClientConnectionSettings }
import akka.http.client.{ HttpClientProcessor, ClientConnectionSettings }
import akka.http.server.ServerSettings
import akka.http.model.{ HttpResponse, HttpRequest, HttpHeader }
import akka.http.model.{ HttpResponse, HttpRequest }
import akka.http.util._
import akka.actor._
@ -22,63 +21,81 @@ object Http extends ExtensionKey[HttpExt] {
/**
* Command that can be sent to `IO(Http)` to trigger the setup of an HTTP client facility at
* a certain API level (connection, host or request).
* The HTTP layer will respond with an `OutgoingHttpChannelInfo` reply (or `Status.Failure`).
* The HTTP layer will respond with an `Http.OutgoingChannel` reply (or `Status.Failure`).
* The sender `ActorRef`of this response can then be sent `HttpRequest` instances to which
* it will respond with `HttpResponse` instances (or `Status.Failure`).
*/
sealed trait OutgoingHttpChannelSetup
sealed trait SetupOutgoingChannel
final case class Connect(remoteAddress: InetSocketAddress,
sslEncryption: Boolean,
localAddress: Option[InetSocketAddress],
options: immutable.Traversable[Inet.SocketOption],
settings: Option[ClientConnectionSettings]) extends OutgoingHttpChannelSetup
settings: Option[ClientConnectionSettings],
materializerSettings: MaterializerSettings) extends SetupOutgoingChannel
object Connect {
def apply(host: String, port: Int = 80, sslEncryption: Boolean = false, localAddress: Option[InetSocketAddress] = None,
options: immutable.Traversable[Inet.SocketOption] = Nil, settings: Option[ClientConnectionSettings] = None): Connect =
apply(new InetSocketAddress(host, port), sslEncryption, localAddress, options, settings)
def apply(host: String, port: Int = 80,
localAddress: Option[InetSocketAddress] = None,
options: immutable.Traversable[Inet.SocketOption] = Nil,
settings: Option[ClientConnectionSettings] = None,
materializerSettings: MaterializerSettings = MaterializerSettings()): Connect =
apply(new InetSocketAddress(host, port), localAddress, options, settings, materializerSettings)
}
case class HostConnectorSetup(host: String, port: Int = 80,
sslEncryption: Boolean = false,
options: immutable.Traversable[Inet.SocketOption] = Nil,
settings: Option[HostConnectorSettings] = None,
connectionType: ClientConnectionType = ClientConnectionType.AutoProxied,
defaultHeaders: immutable.Seq[HttpHeader] = Nil) extends OutgoingHttpChannelSetup {
private[http] def normalized(implicit refFactory: ActorRefFactory) =
if (settings.isDefined) this
else copy(settings = Some(HostConnectorSettings(actorSystem)))
}
object HostConnectorSetup {
def apply(host: String, port: Int, sslEncryption: Boolean)(implicit refFactory: ActorRefFactory): HostConnectorSetup =
apply(host, port, sslEncryption).normalized
}
// PREVIEW OF COMING API HERE:
//
// case class SetupHostConnector(host: String, port: Int = 80,
// options: immutable.Traversable[Inet.SocketOption] = Nil,
// settings: Option[HostConnectorSettings] = None,
// connectionType: ClientConnectionType = ClientConnectionType.AutoProxied,
// defaultHeaders: immutable.Seq[HttpHeader] = Nil) extends SetupOutgoingChannel {
// private[http] def normalized(implicit refFactory: ActorRefFactory) =
// if (settings.isDefined) this
// else copy(settings = Some(HostConnectorSettings(actorSystem)))
// }
// object SetupHostConnector {
// def apply(host: String, port: Int, sslEncryption: Boolean)(implicit refFactory: ActorRefFactory): SetupHostConnector =
// apply(host, port, sslEncryption).normalized
// }
// sealed trait ClientConnectionType
// object ClientConnectionType {
// object Direct extends ClientConnectionType
// object AutoProxied extends ClientConnectionType
// final case class Proxied(proxyHost: String, proxyPort: Int) extends ClientConnectionType
// }
//
// case object SetupRequestChannel extends SetupOutgoingChannel
case object HttpRequestChannelSetup extends OutgoingHttpChannelSetup
final case class OpenOutgoingHttpChannel(channelSetup: OutgoingHttpChannelSetup)
sealed trait OutgoingChannel {
def processor[T]: HttpClientProcessor[T]
}
/**
* Command triggering the shutdown of the respective HTTP channel.
*
* If sent to
* - client-side connection actors: triggers the closing of the connection
* - host-connector actors: triggers the closing of all connections and the shutdown of the host-connector
* - the `HttpManager` actor: triggers the closing of all outgoing and incoming connections, the shutdown of all
* host-connectors and the unbinding of all servers
* An `OutgoingHttpChannel` with a single outgoing HTTP connection as the underlying transport.
*/
type CloseCommand = Tcp.CloseCommand
val Close = Tcp.Close
val ConfirmedClose = Tcp.ConfirmedClose
val Abort = Tcp.Abort
sealed trait ClientConnectionType
object ClientConnectionType {
object Direct extends ClientConnectionType
object AutoProxied extends ClientConnectionType
final case class Proxied(proxyHost: String, proxyPort: Int) extends ClientConnectionType
final case class OutgoingConnection(remoteAddress: InetSocketAddress,
localAddress: InetSocketAddress,
untypedProcessor: HttpClientProcessor[Any]) extends OutgoingChannel {
def processor[T] = untypedProcessor.asInstanceOf[HttpClientProcessor[T]]
}
// PREVIEW OF COMING API HERE:
//
// /**
// * An `OutgoingHttpChannel` with a connection pool to a specific host/port as the underlying transport.
// */
// final case class HostChannel(host: String, port: Int,
// untypedProcessor: HttpClientProcessor[Any]) extends OutgoingChannel {
// def processor[T] = untypedProcessor.asInstanceOf[HttpClientProcessor[T]]
// }
//
// /**
// * A general `OutgoingHttpChannel` with connection pools to all possible host/port combinations
// * as the underlying transport.
// */
// final case class RequestChannel(untypedProcessor: HttpClientProcessor[Any]) extends OutgoingChannel {
// def processor[T] = untypedProcessor.asInstanceOf[HttpClientProcessor[T]]
// }
final case class Bind(endpoint: InetSocketAddress,
backlog: Int,
options: immutable.Traversable[Inet.SocketOption],
@ -92,38 +109,6 @@ object Http extends ExtensionKey[HttpExt] {
apply(new InetSocketAddress(interface, port), backlog, options, serverSettings, materializerSettings)
}
case class Unbind(timeout: Duration)
object Unbind extends Unbind(Duration.Zero)
type ConnectionClosed = Tcp.ConnectionClosed
val Closed = Tcp.Closed
val Aborted = Tcp.Aborted
val ConfirmedClosed = Tcp.ConfirmedClosed
val PeerClosed = Tcp.PeerClosed
type ErrorClosed = Tcp.ErrorClosed; val ErrorClosed = Tcp.ErrorClosed
/**
* Response to an `OutgoingHttpChannelSetup` command (in the success case).
* The `transport` actor can be sent `HttpRequest` instances which will be responded
* to with the respective `HttpResponse` instance as soon as the start of which has
* been received.
* The sender of the `OutgoingHttpChannelInfo` response is always identical to the transport.
*/
sealed trait OutgoingHttpChannelInfo {
def transport: ActorRef
}
final case class Connected(transport: ActorRef,
remoteAddress: InetSocketAddress,
localAddress: InetSocketAddress) extends OutgoingHttpChannelInfo
final case class HostConnectorInfo(transport: ActorRef,
setup: HostConnectorSetup) extends OutgoingHttpChannelInfo
final case class HttpRequestChannelInfo(transport: ActorRef) extends OutgoingHttpChannelInfo
///////////////////// server-side events ////////////////////////
final case class ServerBinding(localAddress: InetSocketAddress,
connectionStream: Producer[IncomingConnection])
@ -131,15 +116,11 @@ object Http extends ExtensionKey[HttpExt] {
requestProducer: Producer[HttpRequest],
responseConsumer: Consumer[HttpResponse])
val Unbound = Tcp.Unbound
case object BindFailedException extends SingletonException
case object UnbindFailedException extends SingletonException
class ConnectionException(message: String) extends RuntimeException(message)
class ConnectionAttemptFailedException(val host: String, val port: Int) extends ConnectionException(s"Connection attempt to $host:$port failed")
class ConnectionAttemptFailedException(val endpoint: InetSocketAddress) extends ConnectionException(s"Connection attempt to $endpoint failed")
class RequestTimeoutException(val request: HttpRequest, message: String) extends ConnectionException(message)
}
@ -148,10 +129,6 @@ class HttpExt(system: ExtendedActorSystem) extends akka.io.IO.Extension {
val Settings = new Settings(system.settings.config getConfig "akka.http")
class Settings private[HttpExt] (config: Config) {
val ManagerDispatcher = config getString "manager-dispatcher"
val SettingsGroupDispatcher = config getString "settings-group-dispatcher"
val HostConnectorDispatcher = config getString "host-connector-dispatcher"
val ListenerDispatcher = config getString "listener-dispatcher"
val ConnectionDispatcher = config getString "connection-dispatcher"
}
val manager = system.actorOf(props = HttpManager.props(Settings), name = "IO-HTTP")

View file

@ -4,13 +4,17 @@
package akka.http
import scala.util.control.NonFatal
import scala.collection.immutable
import akka.io.Inet
import akka.http.model.{ HttpHeader, Uri, HttpRequest }
import akka.http.server.HttpListener
import scala.util.{ Failure, Success }
import scala.concurrent.duration._
import akka.io.IO
import akka.util.Timeout
import akka.stream.io.StreamTcp
import akka.stream.FlowMaterializer
import akka.http.client._
import akka.actor._
import akka.http.server.{ HttpServerPipeline, ServerSettings }
import akka.pattern.ask
import akka.stream.scaladsl.Flow
/**
* INTERNAL API
@ -18,232 +22,62 @@ import akka.actor._
* The gateway actor into the low-level HTTP layer.
*/
private[http] class HttpManager(httpSettings: HttpExt#Settings) extends Actor with ActorLogging {
import HttpManager._
import httpSettings._
import context.dispatcher
// counters for naming the various sub-actors we create
private[this] val listenerCounter = Iterator from 0
private[this] val groupCounter = Iterator from 0
private[this] val hostConnectorCounter = Iterator from 0
private[this] val proxyConnectorCounter = Iterator from 0
private[this] var clientPipelines = Map.empty[ClientConnectionSettings, HttpClientPipeline]
// our child actors
private[this] var settingsGroups = Map.empty[ClientConnectionSettings, ActorRef]
private[this] var connectors = Map.empty[Http.HostConnectorSetup, ActorRef]
private[this] var listeners = Seq.empty[ActorRef]
/////////////////// INITIAL / RUNNING STATE //////////////////////
def receive = withTerminationManagement {
case request: HttpRequest
try {
val uri = request.effectiveUri(securedConnection = false)
val connector = connectorForUri(uri)
// we never render absolute URIs, also drop any potentially existing fragment
connector.forward(request.copy(uri = uri.toRelative.withoutFragment))
} catch {
case NonFatal(e)
log.error("Illegal request: {}", e.getMessage)
sender() ! Status.Failure(e)
}
// 3xx Redirect, sent up from one of our HttpHostConnector children for us to
// forward to the respective HttpHostConnector for the redirection target
case ctx @ HttpHostConnector.RequestContext(req, _, _, commander)
val connector = connectorForUri(req.uri)
// we never render absolute URIs, also drop any potentially existing fragment
val newReq = req.copy(uri = req.uri.toRelative.withoutFragment)
connector.tell(ctx.copy(request = newReq), commander)
case connect: Http.Connect
settingsGroupFor(ClientConnectionSettings(connect.settings)).forward(connect)
case setup: Http.HostConnectorSetup
val connector = connectorFor(setup)
sender().tell(Http.HostConnectorInfo(connector, setup), connector)
// we support sending an HttpRequest instance together with a corresponding HostConnectorSetup
// in one step (rather than sending the setup first and having to wait for the response)
case (request: HttpRequest, setup: Http.HostConnectorSetup)
connectorFor(setup).forward(request)
case Http.HttpRequestChannelSetup sender() ! Http.HttpRequestChannelInfo
case bind: Http.Bind
def receive = {
case connect @ Http.Connect(remoteAddress, localAddress, options, settings, materializerSettings)
log.debug("Attempting connection to {}", remoteAddress)
val commander = sender()
listeners :+= context.watch {
context.actorOf(
props = HttpListener.props(commander, bind, httpSettings),
name = "listener-" + listenerCounter.next())
val effectiveSettings = ClientConnectionSettings(settings)
val tcpConnect = StreamTcp.Connect(materializerSettings, remoteAddress, localAddress, options,
effectiveSettings.connectingTimeout, effectiveSettings.idleTimeout)
val askTimeout = Timeout(effectiveSettings.connectingTimeout + 5.seconds) // FIXME: how can we improve this?
val tcpConnectionFuture = IO(StreamTcp)(context.system).ask(tcpConnect)(askTimeout)
tcpConnectionFuture onComplete {
case Success(tcpConn: StreamTcp.OutgoingTcpConnection)
val pipeline = clientPipelines.getOrElse(effectiveSettings, {
val pl = new HttpClientPipeline(effectiveSettings, FlowMaterializer(materializerSettings), log)
clientPipelines = clientPipelines.updated(effectiveSettings, pl)
pl
})
commander ! pipeline(tcpConn)
case Failure(error)
log.debug("Could not connect to {} due to {}", remoteAddress, error)
commander ! Status.Failure(new Http.ConnectionAttemptFailedException(remoteAddress))
case x throw new IllegalStateException("Unexpected response to `Connect` from StreamTcp: " + x)
}
case cmd: Http.CloseCommand
// start triggering an orderly complete shutdown by first closing all outgoing connections
shutdownSettingsGroups(cmd, Set(sender()))
}
case Http.Bind(endpoint, backlog, options, settings, materializerSettings)
log.debug("Binding to {}", endpoint)
val commander = sender()
val effectiveSettings = ServerSettings(settings)
val tcpBind = StreamTcp.Bind(materializerSettings, endpoint, backlog, options)
val askTimeout = Timeout(effectiveSettings.bindTimeout + 5.seconds) // FIXME: how can we improve this?
val tcpServerBindingFuture = IO(StreamTcp)(context.system).ask(tcpBind)(askTimeout)
tcpServerBindingFuture onComplete {
case Success(StreamTcp.TcpServerBinding(localAddress, connectionStream))
log.info("Bound to {}", endpoint)
val materializer = FlowMaterializer(materializerSettings)
val httpServerPipeline = new HttpServerPipeline(effectiveSettings, materializer, log)
val httpConnectionStream = Flow(connectionStream)
.map(httpServerPipeline)
.toProducer(materializer)
commander ! Http.ServerBinding(localAddress, httpConnectionStream)
def withTerminationManagement(behavior: Receive): Receive = ({
case ev @ Terminated(child)
if (listeners contains child)
listeners = listeners filter (_ != child)
else if (connectors exists (_._2 == child))
connectors = connectors filter { _._2 != child }
else
settingsGroups = settingsGroups filter { _._2 != child }
behavior.applyOrElse(ev, (_: Terminated) ())
case Failure(error)
log.warning("Bind to {} failed due to ", endpoint, error)
commander ! Status.Failure(Http.BindFailedException)
case HttpHostConnector.DemandIdleShutdown
val hostConnector = sender()
var sendPoisonPill = true
connectors = connectors filter {
case (x: ProxyConnectorSetup, proxiedConnector) if x.proxyConnector == hostConnector
proxiedConnector ! HttpHostConnector.DemandIdleShutdown
sendPoisonPill = false // the PoisonPill will be sent by the proxiedConnector
false
case (_, `hostConnector`) false
case _ true
case x throw new IllegalStateException("Unexpected response to `Bind` from StreamTcp: " + x)
}
if (sendPoisonPill) hostConnector ! PoisonPill
}: Receive) orElse behavior
def connectorForUri(uri: Uri) = {
val host = uri.authority.host
connectorFor(Http.HostConnectorSetup(host.toString(), uri.effectivePort, sslEncryption = uri.scheme == "https"))
}
def connectorFor(setup: Http.HostConnectorSetup) = {
val normalizedSetup = resolveAutoProxied(setup)
import Http.ClientConnectionType._
normalizedSetup.connectionType match {
case _: Proxied proxiedConnectorFor(normalizedSetup)
case Direct hostConnectorFor(normalizedSetup)
case AutoProxied throw new IllegalStateException("Unexpected unresolved connectionType `AutoProxied`")
}
}
def proxiedConnectorFor(normalizedSetup: Http.HostConnectorSetup): ActorRef = {
val Http.ClientConnectionType.Proxied(proxyHost, proxyPort) = normalizedSetup.connectionType
val proxyConnector = hostConnectorFor(normalizedSetup.copy(host = proxyHost, port = proxyPort))
val proxySetup = proxyConnectorSetup(normalizedSetup, proxyConnector)
def createAndRegisterProxiedConnector = {
val proxiedConnector = context.actorOf(
props = ProxiedHostConnector.props(normalizedSetup.host, normalizedSetup.port, proxyConnector),
name = "proxy-connector-" + proxyConnectorCounter.next())
connectors = connectors.updated(proxySetup, proxiedConnector)
context.watch(proxiedConnector)
}
connectors.getOrElse(proxySetup, createAndRegisterProxiedConnector)
}
def hostConnectorFor(normalizedSetup: Http.HostConnectorSetup): ActorRef = {
def createAndRegisterHostConnector = {
val settingsGroup = settingsGroupFor(normalizedSetup.settings.get.connectionSettings)
val hostConnector = context.actorOf(
props = HttpHostConnector.props(normalizedSetup, settingsGroup, HostConnectorDispatcher),
name = "host-connector-" + hostConnectorCounter.next())
connectors = connectors.updated(normalizedSetup, hostConnector)
context.watch(hostConnector)
}
connectors.getOrElse(normalizedSetup, createAndRegisterHostConnector)
}
def settingsGroupFor(settings: ClientConnectionSettings): ActorRef = {
def createAndRegisterSettingsGroup = {
val group = context.actorOf(
props = HttpClientSettingsGroup.props(settings, httpSettings),
name = "group-" + groupCounter.next())
settingsGroups = settingsGroups.updated(settings, group)
context.watch(group)
}
settingsGroups.getOrElse(settings, createAndRegisterSettingsGroup)
}
/////////////////// ORDERLY SHUTDOWN PROCESS //////////////////////
// TODO: add configurable timeouts for these shutdown steps
def shutdownSettingsGroups(cmd: Http.CloseCommand, commanders: Set[ActorRef]): Unit =
if (!settingsGroups.isEmpty) {
settingsGroups.values.foreach(_ ! cmd)
context.become(closingSettingsGroups(cmd, commanders))
} else shutdownHostConnectors(cmd, commanders) // if we are done with the outgoing connections, close all host connectors
def closingSettingsGroups(cmd: Http.CloseCommand, commanders: Set[ActorRef]): Receive =
withTerminationManagement {
case _: Http.CloseCommand // the first CloseCommand we received has precedence over ones potentially sent later
context.become(closingSettingsGroups(cmd, commanders + sender()))
case Terminated(_)
if (settingsGroups.isEmpty) // if we are done with the outgoing connections, close all host connectors
shutdownHostConnectors(cmd, commanders)
}
def shutdownHostConnectors(cmd: Http.CloseCommand, commanders: Set[ActorRef]): Unit =
if (!connectors.isEmpty) {
connectors.values.foreach(_ ! cmd)
context.become(closingConnectors(cmd, commanders))
} else shutdownListeners(cmd, commanders) // if we are done with the host connectors, close all listeners
def closingConnectors(cmd: Http.CloseCommand, commanders: Set[ActorRef]): Receive =
withTerminationManagement {
case _: Http.CloseCommand // the first CloseCommand we received has precedence over ones potentially sent later
context.become(closingConnectors(cmd, commanders + sender()))
case Terminated(_)
if (connectors.isEmpty) // if we are done with the host connectors, close all listeners
shutdownListeners(cmd, commanders)
}
def shutdownListeners(cmd: Http.CloseCommand, commanders: Set[ActorRef]): Unit = {
listeners foreach { x x ! cmd }
context.become(unbinding(cmd, commanders))
if (listeners.isEmpty) self ! Http.Unbound
}
def unbinding(cmd: Http.CloseCommand, commanders: Set[ActorRef]): Receive =
withTerminationManagement {
case _: Http.CloseCommand // the first CloseCommand we received has precedence over ones potentially sent later
context.become(unbinding(cmd, commanders + sender()))
case Terminated(_)
if (connectors.isEmpty) {
// if we are done with the listeners we have completed the full orderly shutdown
commanders.foreach(_ ! cmd.event)
context.become(receive)
}
}
}
private[http] object HttpManager {
def props(httpSettings: HttpExt#Settings) =
Props(classOf[HttpManager], httpSettings) withDispatcher httpSettings.ManagerDispatcher
private class ProxyConnectorSetup(host: String, port: Int, sslEncryption: Boolean,
options: immutable.Traversable[Inet.SocketOption],
settings: Option[HostConnectorSettings], connectionType: Http.ClientConnectionType,
defaultHeaders: immutable.Seq[HttpHeader], val proxyConnector: ActorRef)
extends Http.HostConnectorSetup(host, port, sslEncryption, options, settings, connectionType, defaultHeaders)
private def proxyConnectorSetup(normalizedSetup: Http.HostConnectorSetup, proxyConnector: ActorRef) = {
import normalizedSetup._
new ProxyConnectorSetup(host, port, sslEncryption, options, settings, connectionType, defaultHeaders, proxyConnector)
}
def resolveAutoProxied(setup: Http.HostConnectorSetup)(implicit refFactory: ActorRefFactory) = {
val normalizedSetup = setup.normalized
import normalizedSetup._
val resolved =
if (sslEncryption) Http.ClientConnectionType.Direct // TODO
else connectionType match {
case Http.ClientConnectionType.AutoProxied
val scheme = Uri.httpScheme(sslEncryption)
val proxySettings = settings.get.connectionSettings.proxySettings.get(scheme)
proxySettings.filter(_.matchesHost(host)) match {
case Some(ProxySettings(proxyHost, proxyPort, _)) Http.ClientConnectionType.Proxied(proxyHost, proxyPort)
case None Http.ClientConnectionType.Direct
}
case x x
}
normalizedSetup.copy(connectionType = resolved)
}
}

View file

@ -5,7 +5,7 @@
package akka.http.client
import com.typesafe.config.Config
import scala.concurrent.duration.Duration
import scala.concurrent.duration.{ FiniteDuration, Duration }
import akka.actor.ActorRefFactory
import akka.http.model.headers.`User-Agent`
import akka.http.parsing.ParserSettings
@ -13,38 +13,25 @@ import akka.http.util._
final case class ClientConnectionSettings(
userAgentHeader: Option[`User-Agent`],
connectingTimeout: Duration,
connectingTimeout: FiniteDuration,
idleTimeout: Duration,
requestTimeout: Duration,
reapingCycle: Duration,
requestHeaderSizeHint: Int,
maxEncryptionChunkSize: Int,
proxySettings: Map[String, ProxySettings],
parserSettings: ParserSettings) {
require(connectingTimeout >= Duration.Zero, "connectingTimeout must be > 0 or 'infinite'")
require(idleTimeout >= Duration.Zero, "idleTimeout must be > 0 or 'infinite'")
require(requestTimeout >= Duration.Zero, "requestTimeout must be > 0 or 'infinite'")
require(reapingCycle >= Duration.Zero, "reapingCycle must be > 0 or 'infinite'")
require(connectingTimeout >= Duration.Zero, "connectingTimeout must be > 0")
require(requestHeaderSizeHint > 0, "request-size-hint must be > 0")
require(maxEncryptionChunkSize > 0, "max-encryption-chunk-size must be > 0")
}
object ClientConnectionSettings extends SettingsCompanion[ClientConnectionSettings]("akka.http.client") {
def fromSubConfig(c: Config) = {
apply(
c.getString("user-agent-header").toOption.map(`User-Agent`(_)),
c getPotentiallyInfiniteDuration "connecting-timeout",
c getFiniteDuration "connecting-timeout",
c getPotentiallyInfiniteDuration "idle-timeout",
c getPotentiallyInfiniteDuration "request-timeout",
c getPotentiallyInfiniteDuration "reaping-cycle",
c getIntBytes "request-header-size-hint",
c getIntBytes "max-encryption-chunk-size",
ProxySettings fromSubConfig c.getConfig("proxy"),
ParserSettings fromSubConfig c.getConfig("parsing"))
}
def apply(optionalSettings: Option[ClientConnectionSettings])(implicit actorRefFactory: ActorRefFactory): ClientConnectionSettings =
optionalSettings getOrElse apply(actorSystem)
}
}

View file

@ -1,33 +0,0 @@
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.http.client
import com.typesafe.config.Config
import scala.concurrent.duration.Duration
import akka.http.util._
final case class HostConnectorSettings(
maxConnections: Int,
maxRetries: Int,
maxRedirects: Int,
pipelining: Boolean,
idleTimeout: Duration,
connectionSettings: ClientConnectionSettings) {
require(maxConnections > 0, "max-connections must be > 0")
require(maxRetries >= 0, "max-retries must be >= 0")
require(maxRedirects >= 0, "max-redirects must be >= 0")
require(idleTimeout >= Duration.Zero, "idleTimeout must be > 0 or 'infinite'")
}
object HostConnectorSettings extends SettingsCompanion[HostConnectorSettings]("akka.http") {
def fromSubConfig(c: Config) = apply(
c getInt "host-connector.max-connections",
c getInt "host-connector.max-retries",
c getInt "host-connector.max-redirects",
c getBoolean "host-connector.pipelining",
c getPotentiallyInfiniteDuration "host-connector.idle-timeout",
ClientConnectionSettings fromSubConfig c.getConfig("client"))
}

View file

@ -0,0 +1,83 @@
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.http.client
import java.net.InetSocketAddress
import scala.collection.immutable.Queue
import akka.stream.{ FlattenStrategy, FlowMaterializer }
import akka.event.LoggingAdapter
import akka.stream.io.StreamTcp
import akka.stream.scaladsl.{ Flow, Duct }
import akka.http.Http
import akka.http.model.{ HttpMethod, HttpRequest, ErrorInfo, HttpResponse }
import akka.http.rendering.{ RequestRenderingContext, HttpRequestRendererFactory }
import akka.http.parsing.HttpResponseParser
import akka.http.parsing.ParserOutput._
import akka.http.util._
/**
* INTERNAL API
*/
private[http] class HttpClientPipeline(effectiveSettings: ClientConnectionSettings,
materializer: FlowMaterializer,
log: LoggingAdapter)
extends (StreamTcp.OutgoingTcpConnection Http.OutgoingConnection) {
import effectiveSettings._
val rootParser = new HttpResponseParser(parserSettings, materializer)()
val warnOnIllegalHeader: ErrorInfo Unit = errorInfo
if (parserSettings.illegalHeaderWarnings)
log.warning(errorInfo.withSummaryPrepended("Illegal response header").formatPretty)
val responseRendererFactory = new HttpRequestRendererFactory(userAgentHeader, requestHeaderSizeHint, materializer, log)
def apply(tcpConn: StreamTcp.OutgoingTcpConnection): Http.OutgoingConnection = {
val requestMethodByPass = new RequestMethodByPass(tcpConn.remoteAddress)
val (contextBypassConsumer, contextBypassProducer) =
Duct[(HttpRequest, Any)].map(_._2).build(materializer)
val requestConsumer =
Duct[(HttpRequest, Any)]
.tee(contextBypassConsumer)
.map(requestMethodByPass)
.transform(responseRendererFactory.newRenderer)
.flatten(FlattenStrategy.concat)
.transform(errorLogger(log, "Outgoing request stream error"))
.produceTo(materializer, tcpConn.outputStream)
val responseProducer =
Flow(tcpConn.inputStream)
.transform(rootParser.copyWith(warnOnIllegalHeader, requestMethodByPass))
.splitWhen(_.isInstanceOf[MessageStart])
.headAndTail(materializer)
.collect {
case (ResponseStart(statusCode, protocol, headers, createEntity, _), entityParts)
HttpResponse(statusCode, headers, createEntity(entityParts), protocol)
}
.zip(contextBypassProducer)
.toProducer(materializer)
val processor = HttpClientProcessor(requestConsumer.getSubscriber, responseProducer.getPublisher)
Http.OutgoingConnection(tcpConn.remoteAddress, tcpConn.localAddress, processor)
}
class RequestMethodByPass(serverAddress: InetSocketAddress)
extends (((HttpRequest, Any)) RequestRenderingContext) with (() HttpMethod) {
private[this] var requestMethods = Queue.empty[HttpMethod]
def apply(tuple: (HttpRequest, Any)) = {
val request = tuple._1
requestMethods = requestMethods.enqueue(request.method)
RequestRenderingContext(request, serverAddress)
}
def apply(): HttpMethod =
if (requestMethods.nonEmpty) {
val method = requestMethods.head
requestMethods = requestMethods.tail
method
} else HttpResponseParser.NoMethod
}
}

View file

@ -0,0 +1,26 @@
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.http.client
import akka.http.model.{ HttpResponse, HttpRequest }
import org.reactivestreams.spi.{ Publisher, Subscriber }
import org.reactivestreams.api.{ Consumer, Processor }
/**
* A `HttpClientProcessor` models an HTTP client as a stream processor that provides
* responses for requests with an attached context object of a custom type,
* which is funneled through and completely transparent to the processor itself.
*/
trait HttpClientProcessor[T] extends Processor[(HttpRequest, T), (HttpResponse, T)]
object HttpClientProcessor {
def apply[T](requestSubscriber: Subscriber[(HttpRequest, T)],
responsePublisher: Publisher[(HttpResponse, T)]): HttpClientProcessor[T] =
new HttpClientProcessor[T] {
def getSubscriber = requestSubscriber
def getPublisher = responsePublisher
def produceTo(consumer: Consumer[(HttpResponse, T)]): Unit = responsePublisher.subscribe(consumer.getSubscriber)
}
}

View file

@ -1,24 +0,0 @@
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.http.client
import akka.http.HttpExt
import akka.actor.{ Props, Actor }
/**
* INTERNAL API
*/
private[http] class HttpClientSettingsGroup(settings: ClientConnectionSettings,
httpSettings: HttpExt#Settings) extends Actor {
def receive: Receive = ??? // TODO
}
/**
* INTERNAL API
*/
private[http] object HttpClientSettingsGroup {
def props(settings: ClientConnectionSettings, httpSettings: HttpExt#Settings) =
Props(classOf[HttpClientSettingsGroup], httpSettings) withDispatcher httpSettings.SettingsGroupDispatcher
}

View file

@ -1,58 +0,0 @@
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.http.client
import scala.collection.immutable
import akka.actor.{ Props, ActorRef, Actor, ActorLogging }
import akka.http.model.HttpRequest
import akka.http.Http
/**
* INTERNAL API
*/
private[http] class HttpHostConnector(normalizedSetup: Http.HostConnectorSetup, clientConnectionSettingsGroup: ActorRef)
extends Actor with ActorLogging {
def receive: Receive = ??? // TODO
}
/**
* INTERNAL API
*/
private[http] object HttpHostConnector {
def props(normalizedSetup: Http.HostConnectorSetup, clientConnectionSettingsGroup: ActorRef, dispatcher: String) =
Props(classOf[HttpHostConnector], normalizedSetup, clientConnectionSettingsGroup) withDispatcher dispatcher
final case class RequestContext(request: HttpRequest, retriesLeft: Int, redirectsLeft: Int, commander: ActorRef)
final case class Disconnected(rescheduledRequestCount: Int)
case object RequestCompleted
case object DemandIdleShutdown
sealed trait SlotState {
def enqueue(request: HttpRequest): SlotState
def dequeueOne: SlotState
def openRequestCount: Int
}
object SlotState {
sealed abstract class WithoutRequests extends SlotState {
def enqueue(request: HttpRequest) = Connected(immutable.Queue(request))
def dequeueOne = throw new IllegalStateException
def openRequestCount = 0
}
case object Unconnected extends WithoutRequests
final case class Connected(openRequests: immutable.Queue[HttpRequest]) extends SlotState {
require(openRequests.nonEmpty)
def enqueue(request: HttpRequest) = Connected(openRequests.enqueue(request))
def dequeueOne = {
val reqs = openRequests.tail
if (reqs.isEmpty) Idle
else Connected(reqs)
}
def openRequestCount = openRequests.size
}
case object Idle extends WithoutRequests
}
}

View file

@ -1,49 +0,0 @@
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.http.client
import akka.http.model.{ HttpResponse, HttpRequest }
import org.reactivestreams.api.Processor
import java.net.InetSocketAddress
/**
* A `HttpClientProcessor` models an HTTP client as a stream processor that provides
* responses for requests with an attached context object of a custom type,
* which is funneled through and completely transparent to the processor itself.
*/
trait HttpClientProcessor[T] extends Processor[(HttpRequest, T), (HttpResponse, T)]
/**
* An `OutgoingHttpChannel` is a provision of an `HttpClientProcessor` with potentially
* available additional information about the client or server.
*/
sealed trait OutgoingHttpChannel {
def processor[T]: HttpClientProcessor[T]
}
/**
* An `OutgoingHttpChannel` with a single outgoing HTTP connection as the underlying transport.
*/
final case class OutgoingHttpConnection(remoteAddress: InetSocketAddress,
localAddress: InetSocketAddress,
untypedProcessor: HttpClientProcessor[Any]) extends OutgoingHttpChannel {
def processor[T] = untypedProcessor.asInstanceOf[HttpClientProcessor[T]]
}
/**
* An `OutgoingHttpChannel` with a connection pool to a specific host/port as the underlying transport.
*/
final case class HttpHostChannel(host: String, port: Int,
untypedProcessor: HttpClientProcessor[Any]) extends OutgoingHttpChannel {
def processor[T] = untypedProcessor.asInstanceOf[HttpClientProcessor[T]]
}
/**
* A general `OutgoingHttpChannel` with connection pools to all possible host/port combinations
* as the underlying transport.
*/
final case class HttpRequestChannel(untypedProcessor: HttpClientProcessor[Any]) extends OutgoingHttpChannel {
def processor[T] = untypedProcessor.asInstanceOf[HttpClientProcessor[T]]
}

View file

@ -1,58 +0,0 @@
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.http.client
import akka.actor._
import akka.http.model
import model.{ HttpRequest, Uri }
/**
* INTERNAL API
*
* A wrapper around a [[HttpHostConnector]] that is connected to a proxy. Fixes missing Host headers and
* relative URIs or otherwise warns if these differ from the target host/port.
*/
private[http] class ProxiedHostConnector(host: String, port: Int, proxyConnector: ActorRef) extends Actor with ActorLogging {
import Uri._
val authority = Authority(Host(host), port).normalizedForHttp()
val hostHeader = model.headers.Host(host, authority.port)
context.watch(proxyConnector)
def receive: Receive = {
case request: HttpRequest
val headers = request.header[model.headers.Host] match {
case Some(reqHostHeader)
if (authority != Authority(reqHostHeader.host, reqHostHeader.port).normalizedForHttp())
log.warning(s"sending request with header '$reqHostHeader' to a proxied connection to $authority")
request.headers
case None request.headers :+ hostHeader
}
val effectiveUri =
if (request.uri.isRelative)
request.uri.toEffectiveHttpRequestUri(authority.host, port)
else {
if (authority != request.uri.authority.normalizedForHttp())
log.warning(s"sending request with absolute URI '${request.uri}' to a proxied connection to $authority")
request.uri
}
proxyConnector.forward(request.copy(uri = effectiveUri).withHeaders(headers))
case HttpHostConnector.DemandIdleShutdown
proxyConnector ! PoisonPill
context.stop(self)
case Terminated(`proxyConnector`)
context.stop(self)
case x proxyConnector.forward(x)
}
}
private[http] object ProxiedHostConnector {
def props(host: String, port: Int, proxyConnector: ActorRef) =
Props(classOf[ProxiedHostConnector], host, port, proxyConnector)
}

View file

@ -1,74 +0,0 @@
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.http.client
import com.typesafe.config.{ ConfigValueType, Config }
import scala.annotation.tailrec
import scala.collection.JavaConverters._
import akka.http.util._
final case class ProxySettings(host: String, port: Int, nonProxyHosts: List[String]) {
require(host.nonEmpty, "proxy host must be non-empty")
require(0 < port && port < 65536, "illegal proxy port")
require(nonProxyHosts forall validIgnore, "illegal nonProxyHosts")
// see http://docs.oracle.com/javase/6/docs/technotes/guides/net/proxies.html
private def validIgnore(pattern: String) = pattern.exists(_ != '*') && !pattern.drop(1).dropRight(1).contains('*')
val matchesHost: String Boolean = {
@tailrec def rec(remainingNonProxyHosts: List[String], result: String Boolean): String Boolean =
remainingNonProxyHosts match {
case Nil result
case pattern :: remaining
val check: String Boolean =
(pattern endsWith '*', pattern startsWith '*') match {
case (true, true)
val p = pattern.drop(1).dropRight(1); _.contains(p)
case (true, false)
val p = pattern.dropRight(1); _.startsWith(p)
case (false, true)
val p = pattern.drop(1); _.endsWith(p)
case _ _ == pattern
}
rec(remaining, host !check(host) && result(host))
}
rec(nonProxyHosts, result = _ true)
}
}
object ProxySettings extends SettingsCompanion[Map[String, ProxySettings]]("akka.http.client.proxy") {
// see http://docs.oracle.com/javase/6/docs/technotes/guides/net/proxies.html
def fromProperties(properties: Map[String, String], scheme: String): Option[ProxySettings] = {
val proxyHost = properties.get(s"$scheme.proxyHost")
val proxyPort = properties.get(s"$scheme.proxyPort")
val nonProxyHosts = properties.get(s"$scheme.nonProxyHosts")
proxyHost map (apply(
_,
proxyPort.getOrElse("80").toInt,
nonProxyHosts.map(_.fastSplit('|')).getOrElse(Nil).toList))
}
def fromSubConfig(c: Config) = apply(c, sys.props.toMap): Map[String, ProxySettings]
def apply(c: Config, properties: Map[String, String]): Map[String, ProxySettings] = {
def proxySettings(scheme: String) = c.getValue(scheme).valueType() match {
case ConfigValueType.STRING
c.getString(scheme) match {
case "default" fromProperties(properties, scheme).map((scheme, _))
case "none" None
case unknown throw new IllegalArgumentException(s"illegal value for proxy.$scheme: '$unknown'")
}
case _
val cfg = c getConfig scheme
Some(scheme -> apply(
cfg getString "host",
cfg getInt "port",
(cfg getStringList "non-proxy-hosts").asScala.toList))
}
val schemes = c.entrySet.asScala.groupBy(_.getKey.split("\\.")(0)).keySet
schemes.flatMap(proxySettings).toMap
}
}

View file

@ -158,6 +158,11 @@ object HttpEntity {
def extension: String
def isLastChunk: Boolean
}
object ChunkStreamPart {
implicit def apply(string: String): ChunkStreamPart = Chunk(string)
implicit def apply(bytes: Array[Byte]): ChunkStreamPart = Chunk(bytes)
implicit def apply(bytes: ByteString): ChunkStreamPart = Chunk(bytes)
}
/**
* An intermediate entity chunk guaranteed to carry non-empty data.
@ -166,6 +171,10 @@ object HttpEntity {
require(data.nonEmpty, "An HttpEntity.Chunk must have non-empty data")
def isLastChunk = false
}
object Chunk {
def apply(string: String): Chunk = apply(ByteString(string))
def apply(bytes: Array[Byte]): Chunk = apply(ByteString(bytes))
}
/**
* The final chunk of a chunk stream.

View file

@ -97,7 +97,9 @@ final case class HttpRequest(method: HttpMethod = HttpMethods.GET,
entity: HttpEntity.Regular = HttpEntity.Empty,
protocol: HttpProtocol = HttpProtocols.`HTTP/1.1`) extends HttpMessage {
require(!uri.isEmpty, "An HttpRequest must not have an empty Uri")
require(entity.isKnownEmpty || method.isEntityAccepted)
require(entity.isKnownEmpty || method.isEntityAccepted, "Requests with this method must have an empty entity")
require(protocol == HttpProtocols.`HTTP/1.1` || !entity.isInstanceOf[HttpEntity.Chunked],
"HTTP/1.0 requests must not have a chunked entity")
type Self = HttpRequest
@ -108,23 +110,8 @@ final case class HttpRequest(method: HttpMethod = HttpMethods.GET,
* Returns a copy of this requests with the URI resolved according to the logic defined at
* http://tools.ietf.org/html/rfc7230#section-5.5
*/
def effectiveUri(securedConnection: Boolean, defaultHostHeader: Host = Host.empty): Uri = {
val hostHeader = header[Host]
if (uri.isRelative) {
def fail(detail: String) =
throw new IllegalUriException(s"Cannot establish effective request URI of $this, request has a relative URI and $detail")
val Host(host, port) = hostHeader match {
case None if (defaultHostHeader.isEmpty) fail("is missing a `Host` header") else defaultHostHeader
case Some(x) if x.isEmpty if (defaultHostHeader.isEmpty) fail("an empty `Host` header") else defaultHostHeader
case Some(x) x
}
uri.toEffectiveHttpRequestUri(host, port, securedConnection)
} else // http://tools.ietf.org/html/rfc7230#section-5.4
if (hostHeader.isEmpty || uri.authority.isEmpty && hostHeader.get.isEmpty ||
hostHeader.get.host.equalsIgnoreCase(uri.authority.host)) uri
else throw new IllegalUriException("'Host' header value doesn't match request target authority",
s"Host header: $hostHeader\nrequest target authority: ${uri.authority}")
}
def effectiveUri(securedConnection: Boolean, defaultHostHeader: Host = Host.empty): Uri =
HttpRequest.effectiveUri(uri, headers, securedConnection, defaultHostHeader)
/**
* The media-ranges accepted by the client according to the `Accept` request header.
@ -263,6 +250,30 @@ final case class HttpRequest(method: HttpMethod = HttpMethods.GET,
else this
}
object HttpRequest {
/**
* Determines the effective request URI according to the logic defined at
* http://tools.ietf.org/html/rfc7230#section-5.5
*/
def effectiveUri(uri: Uri, headers: immutable.Seq[HttpHeader], securedConnection: Boolean, defaultHostHeader: Host): Uri = {
val hostHeader = headers.collectFirst { case x: Host x }
if (uri.isRelative) {
def fail(detail: String) =
throw new IllegalUriException(s"Cannot establish effective URI of request to `$uri`, request has a relative URI and $detail")
val Host(host, port) = hostHeader match {
case None if (defaultHostHeader.isEmpty) fail("is missing a `Host` header") else defaultHostHeader
case Some(x) if x.isEmpty if (defaultHostHeader.isEmpty) fail("an empty `Host` header") else defaultHostHeader
case Some(x) x
}
uri.toEffectiveHttpRequestUri(host, port, securedConnection)
} else // http://tools.ietf.org/html/rfc7230#section-5.4
if (hostHeader.isEmpty || uri.authority.isEmpty && hostHeader.get.isEmpty ||
hostHeader.get.host.equalsIgnoreCase(uri.authority.host)) uri
else throw new IllegalUriException("'Host' header value of request to `$uri` doesn't match request target authority",
s"Host header: $hostHeader\nrequest target authority: ${uri.authority}")
}
}
/**
* The immutable HTTP response model.
*/

View file

@ -301,7 +301,7 @@ sealed abstract case class Expect private () extends ModeledHeader {
// http://tools.ietf.org/html/rfc7230#section-5.4
object Host extends ModeledCompanion {
def apply(address: InetSocketAddress): Host = apply(address.getHostName, address.getPort)
def apply(address: InetSocketAddress): Host = apply(address.getHostName, address.getPort) // TODO: upgrade to `getHostString` once we are on JDK7
def apply(host: String): Host = apply(host, 0)
def apply(host: String, port: Int): Host = apply(Uri.Host(host), port)
val empty = Host("")

View file

@ -29,6 +29,8 @@ private[http] abstract class HttpMessageParser[Output >: ParserOutput.MessageOut
private[this] val result = new ListBuffer[Output] // transformer op is currently optimized for LinearSeqs
private[this] var state: ByteString StateResult = startNewMessage(_, 0)
private[this] var protocol: HttpProtocol = `HTTP/1.1`
private[this] var terminated = false
override def isComplete = terminated
def onNext(input: ByteString): immutable.Seq[Output] = {
result.clear()
@ -40,9 +42,13 @@ private[http] abstract class HttpMessageParser[Output >: ParserOutput.MessageOut
result.toList
}
def startNewMessage(input: ByteString, offset: Int): StateResult =
try parseMessage(input, offset)
catch { case NotEnoughDataException continue(input, offset)(startNewMessage) }
def startNewMessage(input: ByteString, offset: Int): StateResult = {
def _startNewMessage(input: ByteString, offset: Int): StateResult =
try parseMessage(input, offset)
catch { case NotEnoughDataException continue(input, offset)(_startNewMessage) }
_startNewMessage(input, offset)
}
def parseMessage(input: ByteString, offset: Int): StateResult
@ -83,16 +89,16 @@ private[http] abstract class HttpMessageParser[Output >: ParserOutput.MessageOut
parseHeaderLines(input, lineEnd, h :: headers, headerCount + 1, Some(h), clh, cth, teh, hh)
case h: `Content-Length`
if (clh.isEmpty) parseHeaderLines(input, lineEnd, h :: headers, headerCount + 1, ch, Some(h), cth, teh, hh)
if (clh.isEmpty) parseHeaderLines(input, lineEnd, headers, headerCount + 1, ch, Some(h), cth, teh, hh)
else fail("HTTP message must not contain more than one Content-Length header")
case h: `Content-Type`
if (cth.isEmpty) parseHeaderLines(input, lineEnd, h :: headers, headerCount + 1, ch, clh, Some(h), teh, hh)
if (cth.isEmpty) parseHeaderLines(input, lineEnd, headers, headerCount + 1, ch, clh, Some(h), teh, hh)
else if (cth.get == h) parseHeaderLines(input, lineEnd, headers, headerCount, ch, clh, cth, teh, hh)
else fail("HTTP message must not contain more than one Content-Type header")
case h: `Transfer-Encoding`
parseHeaderLines(input, lineEnd, h :: headers, headerCount + 1, ch, clh, cth, Some(h), hh)
parseHeaderLines(input, lineEnd, headers, headerCount + 1, ch, clh, cth, Some(h), hh)
case h if headerCount < settings.maxHeaderCount
parseHeaderLines(input, lineEnd, h :: headers, headerCount + 1, ch, clh, cth, teh, hh || h.isInstanceOf[Host])
@ -111,21 +117,23 @@ private[http] abstract class HttpMessageParser[Output >: ParserOutput.MessageOut
clh: Option[`Content-Length`], cth: Option[`Content-Type`], teh: Option[`Transfer-Encoding`],
hostHeaderPresent: Boolean, closeAfterResponseCompletion: Boolean): StateResult
def parseFixedLengthBody(remainingBodyBytes: Long)(input: ByteString, bodyStart: Int): StateResult = {
def parseFixedLengthBody(remainingBodyBytes: Long,
isLastMessage: Boolean)(input: ByteString, bodyStart: Int): StateResult = {
val remainingInputBytes = input.length - bodyStart
if (remainingInputBytes > 0) {
if (remainingInputBytes < remainingBodyBytes) {
emit(ParserOutput.EntityPart(input drop bodyStart))
continue(parseFixedLengthBody(remainingBodyBytes - remainingInputBytes))
continue(parseFixedLengthBody(remainingBodyBytes - remainingInputBytes, isLastMessage))
} else {
val offset = bodyStart + remainingBodyBytes.toInt
emit(ParserOutput.EntityPart(input.slice(bodyStart, offset)))
startNewMessage(input, offset)
if (isLastMessage) terminate()
else startNewMessage(input, offset)
}
} else continue(input, bodyStart)(parseFixedLengthBody(remainingBodyBytes))
} else continue(input, bodyStart)(parseFixedLengthBody(remainingBodyBytes, isLastMessage))
}
def parseChunk(input: ByteString, offset: Int): StateResult = {
def parseChunk(input: ByteString, offset: Int, isLastMessage: Boolean): StateResult = {
@tailrec def parseTrailer(extension: String, lineStart: Int, headers: List[HttpHeader] = Nil,
headerCount: Int = 0): StateResult = {
val lineEnd = headerParser.parseHeaderLine(input, lineStart)()
@ -134,7 +142,8 @@ private[http] abstract class HttpMessageParser[Output >: ParserOutput.MessageOut
val lastChunk =
if (extension.isEmpty && headers.isEmpty) HttpEntity.LastChunk else HttpEntity.LastChunk(extension, headers)
emit(ParserOutput.EntityChunk(lastChunk))
startNewMessage(input, lineEnd)
if (isLastMessage) terminate()
else startNewMessage(input, lineEnd)
case header if headerCount < settings.maxHeaderCount
parseTrailer(extension, lineEnd, header :: headers, headerCount + 1)
case _ fail(s"Chunk trailer contains more than the configured limit of ${settings.maxHeaderCount} headers")
@ -146,7 +155,7 @@ private[http] abstract class HttpMessageParser[Output >: ParserOutput.MessageOut
val chunkBodyEnd = cursor + chunkSize
def result(terminatorLen: Int) = {
emit(ParserOutput.EntityChunk(HttpEntity.Chunk(input.slice(cursor, chunkBodyEnd), extension)))
parseChunk(input, chunkBodyEnd + terminatorLen)
parseChunk(input, chunkBodyEnd + terminatorLen, isLastMessage)
}
byteChar(input, chunkBodyEnd) match {
case '\r' if byteChar(input, chunkBodyEnd + 1) == '\n' result(2)
@ -177,7 +186,7 @@ private[http] abstract class HttpMessageParser[Output >: ParserOutput.MessageOut
try parseSize(offset, 0)
catch {
case NotEnoughDataException continue(input, offset)(parseChunk)
case NotEnoughDataException continue(input, offset)(parseChunk(_, _, isLastMessage))
}
}
@ -190,12 +199,12 @@ private[http] abstract class HttpMessageParser[Output >: ParserOutput.MessageOut
case 0 next(_, 0)
case 1 throw new IllegalStateException
}
null // StateResult is a phantom type
done()
}
def continue(next: (ByteString, Int) StateResult): StateResult = {
state = next(_, 0)
null // StateResult is a phantom type
done()
}
def fail(summary: String): StateResult = fail(summary, "")
@ -204,9 +213,16 @@ private[http] abstract class HttpMessageParser[Output >: ParserOutput.MessageOut
def fail(status: StatusCode, summary: String, detail: String = ""): StateResult = fail(status, ErrorInfo(summary, detail))
def fail(status: StatusCode, info: ErrorInfo): StateResult = {
emit(ParserOutput.ParseError(status, info))
null // StateResult is a phantom type
terminate()
}
def terminate(): StateResult = {
terminated = true
done()
}
def done(): StateResult = null // StateResult is a phantom type
def contentType(cth: Option[`Content-Type`]) = cth match {
case Some(x) x.contentType
case None ContentTypes.`application/octet-stream`

View file

@ -131,14 +131,14 @@ private[http] class HttpRequestParser(_settings: ParserSettings,
startNewMessage(input, bodyStart + cl)
} else {
emitRequestStart(defaultEntity(cth, contentLength, materializer))
parseFixedLengthBody(contentLength)(input, bodyStart)
parseFixedLengthBody(contentLength, closeAfterResponseCompletion)(input, bodyStart)
}
case Some(te)
if (te.encodings.size == 1 && te.hasChunked) {
if (clh.isEmpty) {
emitRequestStart(chunkedEntity(cth, materializer))
parseChunk(input, bodyStart)
parseChunk(input, bodyStart, closeAfterResponseCompletion)
} else fail("A chunked request must not contain a Content-Length header.")
} else fail(NotImplemented, s"`$te` is not supported by this server")
}

View file

@ -12,24 +12,26 @@ import akka.stream.scaladsl.Flow
import akka.util.ByteString
import akka.http.model._
import headers._
import HttpResponseParser.NoMethod
/**
* INTERNAL API
*/
private[http] class HttpResponseParser(_settings: ParserSettings,
materializer: FlowMaterializer)(_headerParser: HttpHeaderParser = HttpHeaderParser(_settings))
materializer: FlowMaterializer,
dequeueRequestMethodForNextResponse: () HttpMethod = () NoMethod)(_headerParser: HttpHeaderParser = HttpHeaderParser(_settings))
extends HttpMessageParser[ParserOutput.ResponseOutput](_settings, _headerParser) {
import HttpResponseParser.NoMethod
private[this] var requestMethodForCurrentResponse: HttpMethod = NoMethod
private[this] var statusCode: StatusCode = StatusCodes.OK
def copyWith(warnOnIllegalHeader: ErrorInfo Unit): HttpResponseParser =
new HttpResponseParser(settings, materializer)(headerParser.copyWith(warnOnIllegalHeader))
def copyWith(warnOnIllegalHeader: ErrorInfo Unit, dequeueRequestMethodForNextResponse: () HttpMethod): HttpResponseParser =
new HttpResponseParser(settings, materializer, dequeueRequestMethodForNextResponse)(headerParser.copyWith(warnOnIllegalHeader))
def setRequestMethodForNextResponse(method: HttpMethod): Unit =
requestMethodForCurrentResponse = method
override def startNewMessage(input: ByteString, offset: Int): StateResult = {
requestMethodForCurrentResponse = dequeueRequestMethodForNextResponse()
super.startNewMessage(input, offset)
}
def parseMessage(input: ByteString, offset: Int): StateResult =
if (requestMethodForCurrentResponse ne NoMethod) {
@ -93,7 +95,7 @@ private[http] class HttpResponseParser(_settings: ParserSettings,
startNewMessage(input, bodyStart + cl)
} else {
emitResponseStart(defaultEntity(cth, contentLength, materializer))
parseFixedLengthBody(contentLength)(input, bodyStart)
parseFixedLengthBody(contentLength, closeAfterResponseCompletion)(input, bodyStart)
}
case None
emitResponseStart { entityParts
@ -107,7 +109,7 @@ private[http] class HttpResponseParser(_settings: ParserSettings,
if (te.encodings.size == 1 && te.hasChunked) {
if (clh.isEmpty) {
emitResponseStart(chunkedEntity(cth, materializer))
parseChunk(input, bodyStart)
parseChunk(input, bodyStart, closeAfterResponseCompletion)
} else fail("A chunked request must not contain a Content-Length header.")
} else fail(s"`$te` is not supported by this client")
}
@ -127,6 +129,6 @@ private[http] class HttpResponseParser(_settings: ParserSettings,
/**
* INTERNAL API
*/
private[parsing] object HttpResponseParser {
private[http] object HttpResponseParser {
val NoMethod = HttpMethod.custom("NONE", safe = false, idempotent = false, entityAccepted = false)
}

View file

@ -20,7 +20,6 @@ final case class ParserSettings(
maxChunkSize: Int,
uriParsingMode: Uri.ParsingMode,
illegalHeaderWarnings: Boolean,
sslSessionInfoHeader: Boolean,
headerValueCacheLimits: Map[String, Int]) {
require(maxUriLength > 0, "max-uri-length must be > 0")
@ -53,7 +52,6 @@ object ParserSettings extends SettingsCompanion[ParserSettings]("akka.http.parsi
c getIntBytes "max-chunk-size",
Uri.ParsingMode(c getString "uri-parsing-mode"),
c getBoolean "illegal-header-warnings",
c getBoolean "ssl-session-info-header",
cacheConfig.entrySet.asScala.map(kvp kvp.getKey -> cacheConfig.getInt(kvp.getKey))(collection.breakOut))
}
}

View file

@ -0,0 +1,131 @@
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.http.rendering
import java.net.InetSocketAddress
import org.reactivestreams.api.Producer
import scala.annotation.tailrec
import scala.collection.immutable
import akka.event.LoggingAdapter
import akka.util.ByteString
import akka.stream.scaladsl.Flow
import akka.stream.{ FlowMaterializer, Transformer }
import akka.stream.impl.SynchronousProducerFromIterable
import akka.http.model._
import akka.http.util._
import RenderSupport._
import headers._
/**
* INTERNAL API
*/
private[http] class HttpRequestRendererFactory(userAgentHeader: Option[headers.`User-Agent`],
requestHeaderSizeHint: Int,
materializer: FlowMaterializer,
log: LoggingAdapter) {
def newRenderer: HttpRequestRenderer = new HttpRequestRenderer
final class HttpRequestRenderer extends Transformer[RequestRenderingContext, Producer[ByteString]] {
def onNext(ctx: RequestRenderingContext): immutable.Seq[Producer[ByteString]] = {
val r = new ByteStringRendering(requestHeaderSizeHint)
import ctx.request._
def renderRequestLine(): Unit = {
r ~~ method ~~ ' '
val rawRequestUriRendered = headers.exists {
case `Raw-Request-URI`(rawUri)
r ~~ rawUri; true
case _ false
}
if (!rawRequestUriRendered) UriRendering.renderUriWithoutFragment(r, uri, UTF8)
r ~~ ' ' ~~ protocol ~~ CrLf
}
def render(h: HttpHeader) = r ~~ h ~~ CrLf
@tailrec def renderHeaders(remaining: List[HttpHeader], hostHeaderSeen: Boolean = false,
userAgentSeen: Boolean = false): Unit =
remaining match {
case head :: tail head match {
case x: `Content-Length`
suppressionWarning(log, x, "explicit `Content-Length` header is not allowed. Use the appropriate HttpEntity subtype.")
renderHeaders(tail, hostHeaderSeen, userAgentSeen)
case x: `Content-Type`
suppressionWarning(log, x, "explicit `Content-Type` header is not allowed. Set `HttpRequest.entity.contentType` instead.")
renderHeaders(tail, hostHeaderSeen, userAgentSeen)
case `Transfer-Encoding`(_)
suppressionWarning(log, head)
renderHeaders(tail, hostHeaderSeen, userAgentSeen)
case x: `Host`
render(x)
renderHeaders(tail, hostHeaderSeen = true, userAgentSeen)
case x: `User-Agent`
render(x)
renderHeaders(tail, hostHeaderSeen, userAgentSeen = true)
case x: `Raw-Request-URI` // we never render this header
renderHeaders(tail, hostHeaderSeen, userAgentSeen)
case x: RawHeader if x.lowercaseName == "content-type" ||
x.lowercaseName == "content-length" ||
x.lowercaseName == "transfer-encoding" ||
x.lowercaseName == "host" ||
x.lowercaseName == "user-agent"
suppressionWarning(log, x, "illegal RawHeader")
renderHeaders(tail, hostHeaderSeen, userAgentSeen)
case x
render(x)
renderHeaders(tail, hostHeaderSeen, userAgentSeen)
}
case Nil
if (!hostHeaderSeen) r ~~ Host(ctx.serverAddress) ~~ CrLf
if (!userAgentSeen && userAgentHeader.isDefined) r ~~ userAgentHeader.get ~~ CrLf
}
def renderContentLength(contentLength: Long): Unit = {
if (method.isEntityAccepted) r ~~ `Content-Length` ~~ contentLength ~~ CrLf
r ~~ CrLf
}
def completeRequestRendering(): immutable.Seq[Producer[ByteString]] =
entity match {
case HttpEntity.Strict(contentType, data)
renderContentLength(data.length)
SynchronousProducerFromIterable(r.get :: data :: Nil) :: Nil
case HttpEntity.Default(contentType, contentLength, data)
renderContentLength(contentLength)
renderByteStrings(r,
Flow(data).transform(new CheckContentLengthTransformer(contentLength)).toProducer(materializer),
materializer)
case HttpEntity.Chunked(contentType, chunks)
r ~~ `Transfer-Encoding` ~~ Chunked ~~ CrLf ~~ CrLf
renderByteStrings(r, Flow(chunks).transform(new ChunkTransformer).toProducer(materializer), materializer)
}
renderRequestLine()
renderHeaders(headers.toList)
renderEntityContentType(r, entity)
if (entity.isKnownEmpty) {
renderContentLength(0)
SynchronousProducerFromIterable(r.get :: Nil) :: Nil
} else completeRequestRendering()
}
}
}
/**
* INTERNAL API
*/
private[http] final case class RequestRenderingContext(request: HttpRequest, serverAddress: InetSocketAddress)

View file

@ -10,8 +10,8 @@ import scala.collection.immutable
import akka.event.LoggingAdapter
import akka.util.ByteString
import akka.stream.scaladsl.Flow
import akka.stream.{ FlowMaterializer, Transformer }
import akka.stream.impl.SynchronousProducerFromIterable
import akka.stream.{ FlowMaterializer, Transformer }
import akka.http.model._
import akka.http.util._
import RenderSupport._
@ -26,24 +26,26 @@ private[http] class HttpResponseRendererFactory(serverHeader: Option[headers.Ser
materializer: FlowMaterializer,
log: LoggingAdapter) {
private val serverHeaderPlusDateColonSP: Array[Byte] =
private val renderDefaultServerHeader: Rendering Unit =
serverHeader match {
case None "Date: ".getAsciiBytes
case Some(header) (new ByteArrayRendering(64) ~~ header ~~ Rendering.CrLf ~~ "Date: ").get
case Some(h)
val bytes = (new ByteArrayRendering(32) ~~ h ~~ CrLf).get
_ ~~ bytes
case None _ ()
}
// as an optimization we cache the ServerAndDateHeader of the last second here
@volatile private[this] var cachedServerAndDateHeader: (Long, Array[Byte]) = (0L, null)
// as an optimization we cache the Date header of the last second here
@volatile private[this] var cachedDateHeader: (Long, Array[Byte]) = (0L, null)
private def serverAndDateHeader: Array[Byte] = {
var (cachedSeconds, cachedBytes) = cachedServerAndDateHeader
private def dateHeader: Array[Byte] = {
var (cachedSeconds, cachedBytes) = cachedDateHeader
val now = System.currentTimeMillis
if (now / 1000 != cachedSeconds) {
if (now - 1000 > cachedSeconds) {
cachedSeconds = now / 1000
val r = new ByteArrayRendering(serverHeaderPlusDateColonSP.length + 31)
dateTime(now).renderRfc1123DateTimeString(r ~~ serverHeaderPlusDateColonSP) ~~ CrLf
val r = new ByteArrayRendering(48)
dateTime(now).renderRfc1123DateTimeString(r ~~ headers.Date) ~~ CrLf
cachedBytes = r.get
cachedServerAndDateHeader = cachedSeconds -> cachedBytes
cachedDateHeader = cachedSeconds -> cachedBytes
}
cachedBytes
}
@ -52,7 +54,7 @@ private[http] class HttpResponseRendererFactory(serverHeader: Option[headers.Ser
def newRenderer: HttpResponseRenderer = new HttpResponseRenderer
class HttpResponseRenderer extends Transformer[ResponseRenderingContext, Producer[ByteString]] {
final class HttpResponseRenderer extends Transformer[ResponseRenderingContext, Producer[ByteString]] {
private[this] var close = false // signals whether the connection is to be closed after the current response
override def isComplete = close
@ -61,32 +63,36 @@ private[http] class HttpResponseRendererFactory(serverHeader: Option[headers.Ser
val r = new ByteStringRendering(responseHeaderSizeHint)
import ctx.response._
if (status eq StatusCodes.OK) r ~~ DefaultStatusLine else r ~~ StatusLineStart ~~ status ~~ CrLf
r ~~ serverAndDateHeader
val noEntity = entity.isKnownEmpty || ctx.requestMethod == HttpMethods.HEAD
def renderStatusLine(): Unit =
if (status eq StatusCodes.OK) r ~~ DefaultStatusLine else r ~~ StatusLineStart ~~ status ~~ CrLf
def render(h: HttpHeader) = r ~~ h ~~ CrLf
@tailrec def renderHeaders(remaining: List[HttpHeader], alwaysClose: Boolean = false,
connHeader: Connection = null): Unit = {
def render(h: HttpHeader) = r ~~ h ~~ CrLf
def suppressionWarning(h: HttpHeader, msg: String = "the akka-http-core layer sets this header automatically!"): Unit =
log.warning("Explicitly set response header '{}' is ignored, {}", h, msg)
connHeader: Connection = null, serverHeaderSeen: Boolean = false): Unit =
remaining match {
case head :: tail head match {
case x: `Content-Length`
suppressionWarning(x, "explicit `Content-Length` header is not allowed. Use the appropriate HttpEntity subtype.")
renderHeaders(tail, alwaysClose, connHeader)
suppressionWarning(log, x, "explicit `Content-Length` header is not allowed. Use the appropriate HttpEntity subtype.")
renderHeaders(tail, alwaysClose, connHeader, serverHeaderSeen)
case x: `Content-Type`
suppressionWarning(x, "explicit `Content-Type` header is not allowed. Set `HttpResponse.entity.contentType`, instead.")
renderHeaders(tail, alwaysClose, connHeader)
suppressionWarning(log, x, "explicit `Content-Type` header is not allowed. Set `HttpResponse.entity.contentType` instead.")
renderHeaders(tail, alwaysClose, connHeader, serverHeaderSeen)
case `Transfer-Encoding`(_) | Date(_) | Server(_)
suppressionWarning(head)
renderHeaders(tail, alwaysClose, connHeader)
case `Transfer-Encoding`(_) | Date(_)
suppressionWarning(log, head)
renderHeaders(tail, alwaysClose, connHeader, serverHeaderSeen)
case x: `Connection`
val connectionHeader = if (connHeader eq null) x else Connection(x.tokens ++ connHeader.tokens)
renderHeaders(tail, alwaysClose, connectionHeader)
renderHeaders(tail, alwaysClose, connectionHeader, serverHeaderSeen)
case x: `Server`
render(x)
renderHeaders(tail, alwaysClose, connHeader, serverHeaderSeen = true)
case x: RawHeader if x.lowercaseName == "content-type" ||
x.lowercaseName == "content-length" ||
@ -94,15 +100,17 @@ private[http] class HttpResponseRendererFactory(serverHeader: Option[headers.Ser
x.lowercaseName == "date" ||
x.lowercaseName == "server" ||
x.lowercaseName == "connection"
suppressionWarning(x, "illegal RawHeader")
renderHeaders(tail, alwaysClose, connHeader)
suppressionWarning(log, x, "illegal RawHeader")
renderHeaders(tail, alwaysClose, connHeader, serverHeaderSeen)
case x
render(x)
renderHeaders(tail, alwaysClose, connHeader)
renderHeaders(tail, alwaysClose, connHeader, serverHeaderSeen)
}
case Nil
if (!serverHeaderSeen) renderDefaultServerHeader(r)
r ~~ dateHeader
close = alwaysClose ||
ctx.closeAfterResponseCompletion || // request wants to close
(connHeader != null && connHeader.hasClose) // application wants to close
@ -112,80 +120,46 @@ private[http] class HttpResponseRendererFactory(serverHeader: Option[headers.Ser
case _ // no need for rendering
}
}
}
def renderByteStrings(entityBytes: Producer[ByteString]): immutable.Seq[Producer[ByteString]] = {
val messageStart = SynchronousProducerFromIterable(r.get :: Nil)
val messageBytes =
if (!entity.isKnownEmpty && ctx.requestMethod != HttpMethods.HEAD)
Flow(messageStart).concat(entityBytes).toProducer(materializer)
else messageStart
messageBytes :: Nil
}
def byteStrings(entityBytes: Producer[ByteString]): immutable.Seq[Producer[ByteString]] =
renderByteStrings(r, entityBytes, materializer, skipEntity = noEntity)
def renderContentType(entity: HttpEntity): Unit =
if (!entity.isKnownEmpty && entity.contentType != ContentTypes.NoContentType)
r ~~ `Content-Type` ~~ entity.contentType ~~ CrLf
def renderEntity(entity: HttpEntity): immutable.Seq[Producer[ByteString]] =
def completeResponseRendering(entity: HttpEntity): immutable.Seq[Producer[ByteString]] =
entity match {
case HttpEntity.Strict(contentType, data)
renderHeaders(headers.toList)
renderContentType(entity)
renderEntityContentType(r, entity)
r ~~ `Content-Length` ~~ data.length ~~ CrLf ~~ CrLf
if (!entity.isKnownEmpty && ctx.requestMethod != HttpMethods.HEAD) r ~~ data
SynchronousProducerFromIterable(r.get :: Nil) :: Nil
val entityBytes = if (noEntity) Nil else data :: Nil
SynchronousProducerFromIterable(r.get :: entityBytes) :: Nil
case HttpEntity.Default(contentType, contentLength, data)
renderHeaders(headers.toList)
renderContentType(entity)
renderEntityContentType(r, entity)
r ~~ `Content-Length` ~~ contentLength ~~ CrLf ~~ CrLf
renderByteStrings(Flow(data).transform(new CheckContentLengthTransformer(contentLength)).toProducer(materializer))
byteStrings(Flow(data).transform(new CheckContentLengthTransformer(contentLength)).toProducer(materializer))
case HttpEntity.CloseDelimited(contentType, data)
renderHeaders(headers.toList, alwaysClose = true)
renderContentType(entity)
renderEntityContentType(r, entity)
r ~~ CrLf
renderByteStrings(data)
byteStrings(data)
case HttpEntity.Chunked(contentType, chunks)
if (ctx.requestProtocol == `HTTP/1.0`)
renderEntity(HttpEntity.CloseDelimited(contentType, Flow(chunks).map(_.data).toProducer(materializer)))
completeResponseRendering(HttpEntity.CloseDelimited(contentType, Flow(chunks).map(_.data).toProducer(materializer)))
else {
renderHeaders(headers.toList)
renderContentType(entity)
if (!entity.isKnownEmpty) r ~~ `Transfer-Encoding` ~~ Chunked ~~ CrLf
renderEntityContentType(r, entity)
if (!entity.isKnownEmpty || ctx.requestMethod == HttpMethods.HEAD)
r ~~ `Transfer-Encoding` ~~ Chunked ~~ CrLf
r ~~ CrLf
renderByteStrings(Flow(chunks).transform(new ChunkTransformer).toProducer(materializer))
byteStrings(Flow(chunks).transform(new ChunkTransformer).toProducer(materializer))
}
}
renderEntity(entity)
}
}
class ChunkTransformer extends Transformer[HttpEntity.ChunkStreamPart, ByteString] {
var lastChunkSeen = false
def onNext(chunk: HttpEntity.ChunkStreamPart): immutable.Seq[ByteString] = {
if (chunk.isLastChunk) lastChunkSeen = true
renderChunk(chunk) :: Nil
}
override def isComplete = lastChunkSeen
override def onTermination(e: Option[Throwable]) = if (lastChunkSeen) Nil else defaultLastChunkBytes :: Nil
}
class CheckContentLengthTransformer(length: Long) extends Transformer[ByteString, ByteString] {
var sent = 0L
def onNext(elem: ByteString): immutable.Seq[ByteString] = {
sent += elem.length
if (sent > length)
throw new InvalidContentLengthException(s"Response had declared Content-Length $length but entity chunk stream amounts to more bytes")
elem :: Nil
}
override def onTermination(e: Option[Throwable]): immutable.Seq[ByteString] = {
if (sent < length)
throw new InvalidContentLengthException(s"Response had declared Content-Length $length but entity chunk stream amounts to ${length - sent} bytes less")
Nil
renderStatusLine()
completeResponseRendering(entity)
}
}
}

View file

@ -4,10 +4,16 @@
package akka.http.rendering
import org.reactivestreams.api.Producer
import scala.collection.immutable
import akka.parboiled2.CharUtils
import akka.http.model.{ HttpEntity, HttpHeader }
import akka.http.util._
import akka.util.ByteString
import akka.event.LoggingAdapter
import akka.stream.impl.SynchronousProducerFromIterable
import akka.stream.scaladsl.Flow
import akka.stream.{ FlowMaterializer, Transformer }
import akka.http.model._
import akka.http.util._
/**
* INTERNAL API
@ -25,7 +31,46 @@ private object RenderSupport {
val defaultLastChunkBytes: ByteString = renderChunk(HttpEntity.LastChunk)
def renderChunk(chunk: HttpEntity.ChunkStreamPart): ByteString = {
def renderEntityContentType(r: Rendering, entity: HttpEntity): Unit =
if (entity.contentType != ContentTypes.NoContentType)
r ~~ headers.`Content-Type` ~~ entity.contentType ~~ CrLf
def renderByteStrings(r: ByteStringRendering, entityBytes: Producer[ByteString], materializer: FlowMaterializer,
skipEntity: Boolean = false): immutable.Seq[Producer[ByteString]] = {
val messageStart = SynchronousProducerFromIterable(r.get :: Nil)
val messageBytes =
if (!skipEntity) Flow(messageStart).concat(entityBytes).toProducer(materializer)
else messageStart
messageBytes :: Nil
}
class ChunkTransformer extends Transformer[HttpEntity.ChunkStreamPart, ByteString] {
var lastChunkSeen = false
def onNext(chunk: HttpEntity.ChunkStreamPart): immutable.Seq[ByteString] = {
if (chunk.isLastChunk) lastChunkSeen = true
renderChunk(chunk) :: Nil
}
override def isComplete = lastChunkSeen
override def onTermination(e: Option[Throwable]) = if (lastChunkSeen) Nil else defaultLastChunkBytes :: Nil
}
class CheckContentLengthTransformer(length: Long) extends Transformer[ByteString, ByteString] {
var sent = 0L
def onNext(elem: ByteString): immutable.Seq[ByteString] = {
sent += elem.length
if (sent > length)
throw new InvalidContentLengthException(s"HTTP message had declared Content-Length $length but entity chunk stream amounts to more bytes")
elem :: Nil
}
override def onTermination(e: Option[Throwable]): immutable.Seq[ByteString] = {
if (sent < length)
throw new InvalidContentLengthException(s"HTTP message had declared Content-Length $length but entity chunk stream amounts to ${length - sent} bytes less")
Nil
}
}
private def renderChunk(chunk: HttpEntity.ChunkStreamPart): ByteString = {
import chunk._
val renderedSize = // buffer space required for rendering (without trailer)
CharUtils.numberOfHexDigits(data.length) +
@ -44,4 +89,8 @@ private object RenderSupport {
r ~~ CrLf
r.get
}
def suppressionWarning(log: LoggingAdapter, h: HttpHeader,
msg: String = "the akka-http-core layer sets this header automatically!"): Unit =
log.warning("Explicitly set HTTP header '{}' is ignored, {}", h, msg)
}

View file

@ -1,134 +0,0 @@
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.http.server
import scala.concurrent.duration._
import akka.io.{ Tcp, IO }
import akka.stream.io.{ TcpListenStreamActor, StreamTcp }
import akka.stream.{ Transformer, FlowMaterializer }
import akka.stream.scaladsl.Flow
import akka.http.util.Timestamp
import akka.http.{ Http, HttpExt }
import akka.actor._
/**
* INTERNAL API
*/
private[http] class HttpListener(bindCommander: ActorRef,
bind: Http.Bind,
httpSettings: HttpExt#Settings) extends Actor with ActorLogging {
import HttpListener._
import bind._
private val settings = bind.serverSettings getOrElse ServerSettings(context.system)
log.debug("Binding to {}", endpoint)
IO(StreamTcp)(context.system) ! StreamTcp.Bind(materializerSettings, endpoint, backlog, options)
context.setReceiveTimeout(settings.bindTimeout)
val httpServerPipeline = new HttpServerPipeline(settings, FlowMaterializer(materializerSettings), log)
// we cannot sensibly recover from crashes
override def supervisorStrategy = SupervisorStrategy.stoppingStrategy
def receive = binding
def binding: Receive = {
case StreamTcp.TcpServerBinding(localAddress, connectionStream)
log.info("Bound to {}", endpoint)
val materializer = FlowMaterializer(materializerSettings)
val httpConnectionStream = Flow(connectionStream)
.map(httpServerPipeline)
.transform {
new Transformer[Http.IncomingConnection, Http.IncomingConnection] {
def onNext(element: Http.IncomingConnection) = element :: Nil
override def cleanup() = shutdown(gracePeriod = Duration.Zero)
}
}.toProducer(materializer)
bindCommander ! Http.ServerBinding(localAddress, httpConnectionStream)
context.setReceiveTimeout(Duration.Undefined)
context.become(connected(sender()))
case Status.Failure(_: TcpListenStreamActor.TcpListenStreamException)
log.warning("Bind to {} failed", endpoint)
bindCommander ! Status.Failure(Http.BindFailedException)
context.stop(self)
case ReceiveTimeout
log.warning("Bind to {} failed, timeout {} expired", endpoint, settings.bindTimeout)
bindCommander ! Status.Failure(Http.BindFailedException)
context.stop(self)
case Http.Unbind(_) // no children possible, so no reason to note the timeout
log.info("Bind to {} aborted", endpoint)
bindCommander ! Status.Failure(Http.BindFailedException)
context.become(bindingAborted(Set(sender())))
}
/** Waiting for the bind to execute to close it down instantly afterwards */
def bindingAborted(unbindCommanders: Set[ActorRef]): Receive = {
case _: StreamTcp.TcpServerBinding
unbind(sender(), unbindCommanders, Duration.Zero)
case Status.Failure(_: TcpListenStreamActor.TcpListenStreamException)
unbindCommanders foreach (_ ! Http.Unbound)
context.stop(self)
case ReceiveTimeout
unbindCommanders foreach (_ ! Http.Unbound)
context.stop(self)
case Http.Unbind(_) context.become(bindingAborted(unbindCommanders + sender()))
}
def connected(tcpListener: ActorRef): Receive = {
case Http.Unbind(timeout) unbind(tcpListener, Set(sender()), timeout)
}
def unbind(tcpListener: ActorRef, unbindCommanders: Set[ActorRef], timeout: Duration): Unit = {
tcpListener ! Tcp.Unbind
context.setReceiveTimeout(settings.unbindTimeout)
context.become(unbinding(unbindCommanders, timeout))
}
def unbinding(commanders: Set[ActorRef], gracePeriod: Duration): Receive = {
case Tcp.Unbound
log.info("Unbound from {}", endpoint)
commanders foreach (_ ! Http.Unbound)
shutdown(gracePeriod)
case ReceiveTimeout
log.warning("Unbinding from {} failed, timeout {} expired, stopping", endpoint, settings.unbindTimeout)
commanders foreach (_ ! Status.Failure(Http.UnbindFailedException))
context.stop(self)
case Http.Unbind(_)
// the first Unbind we received has precedence over ones potentially sent later
context.become(unbinding(commanders + sender(), gracePeriod))
}
def shutdown(gracePeriod: Duration): Unit =
if (gracePeriod == Duration.Zero || context.children.nonEmpty) {
context.setReceiveTimeout(Duration.Undefined)
self ! Tick
context.become(inShutdownGracePeriod(Timestamp.now + gracePeriod))
} else context.stop(self)
/** Wait for a last grace period to expire before shutting us (and our children down) */
def inShutdownGracePeriod(timeout: Timestamp): Receive = {
case Tick
if (timeout.isPast || context.children.isEmpty) context.stop(self)
else context.system.scheduler.scheduleOnce(1.second, self, Tick)(context.dispatcher)
}
}
private[http] object HttpListener {
def props(bindCommander: ActorRef, bind: Http.Bind, httpSettings: HttpExt#Settings) =
Props(new HttpListener(bindCommander, bind, httpSettings)) withDispatcher httpSettings.ListenerDispatcher
private case object Tick
}

View file

@ -6,7 +6,6 @@ package akka.http.server
import org.reactivestreams.api.Producer
import akka.event.LoggingAdapter
import akka.util.ByteString
import akka.stream.io.StreamTcp
import akka.stream.{ FlattenStrategy, Transformer, FlowMaterializer }
import akka.stream.scaladsl.{ Flow, Duct }
@ -15,6 +14,8 @@ import akka.http.rendering.{ ResponseRenderingContext, HttpResponseRendererFacto
import akka.http.model.{ StatusCode, ErrorInfo, HttpRequest, HttpResponse }
import akka.http.parsing.ParserOutput._
import akka.http.Http
import akka.http.util._
import akka.http.model.headers.Host
/**
* INTERNAL API
@ -23,7 +24,6 @@ private[http] class HttpServerPipeline(settings: ServerSettings,
materializer: FlowMaterializer,
log: LoggingAdapter)
extends (StreamTcp.IncomingTcpConnection Http.IncomingConnection) {
import HttpServerPipeline._
val rootParser = new HttpRequestParser(settings.parserSettings, settings.rawRequestUriHeader, materializer)()
val warnOnIllegalHeader: ErrorInfo Unit = errorInfo
@ -45,7 +45,11 @@ private[http] class HttpServerPipeline(settings: ServerSettings,
.splitWhen(_.isInstanceOf[MessageStart])
.headAndTail(materializer)
.tee(applicationBypassConsumer)
.collect { case (x: RequestStart, entityParts) HttpServerPipeline.constructRequest(x, entityParts) }
.collect {
case (RequestStart(method, uri, protocol, headers, createEntity, _), entityParts)
val effectiveUri = HttpRequest.effectiveUri(uri, headers, securedConnection = false, settings.defaultHostHeader)
HttpRequest(method, effectiveUri, headers, createEntity(entityParts), protocol)
}
.toProducer(materializer)
val responseConsumer =
@ -54,12 +58,8 @@ private[http] class HttpServerPipeline(settings: ServerSettings,
.transform(applyApplicationBypass)
.transform(responseRendererFactory.newRenderer)
.flatten(FlattenStrategy.concat)
.transform {
new Transformer[ByteString, ByteString] {
def onNext(element: ByteString) = element :: Nil
override def onError(cause: Throwable): Unit = log.error(cause, "Response stream error")
}
}.produceTo(materializer, tcpConn.outputStream)
.transform(errorLogger(log, "Outgoing response stream error"))
.produceTo(materializer, tcpConn.outputStream)
Http.IncomingConnection(tcpConn.remoteAddress, requestProducer, responseConsumer)
}
@ -109,18 +109,4 @@ private[http] class HttpServerPipeline(settings: ServerSettings,
ResponseRenderingContext(HttpResponse(status, entity = msg), closeAfterResponseCompletion = true)
}
}
}
private[http] object HttpServerPipeline {
def constructRequest(requestStart: RequestStart, entityParts: Producer[RequestOutput]): HttpRequest = {
import requestStart._
HttpRequest(method, uri, headers, createEntity(entityParts), protocol)
}
implicit class FlowWithHeadAndTail[T](val underlying: Flow[Producer[T]]) {
def headAndTail(materializer: FlowMaterializer): Flow[(T, Producer[T])] =
underlying.map { p
Flow(p).prefixAndTail(1).map { case (prefix, tail) (prefix.head, tail) }.toProducer(materializer)
}.flatten(FlattenStrategy.Concat())
}
}

View file

@ -7,6 +7,7 @@ package akka.http.server
import language.implicitConversions
import com.typesafe.config.Config
import scala.concurrent.duration._
import akka.actor.ActorRefFactory
import akka.http.parsing.ParserSettings
import akka.http.model.parser.HeaderParser
import akka.http.model.headers.{ Server, Host, RawHeader }
@ -15,64 +16,35 @@ import akka.ConfigurationException
final case class ServerSettings(
serverHeader: Option[Server],
sslEncryption: Boolean,
pipeliningLimit: Int,
timeouts: ServerSettings.Timeouts,
timeoutHandler: String,
reapingCycle: Duration,
remoteAddressHeader: Boolean,
rawRequestUriHeader: Boolean,
transparentHeadRequests: Boolean,
verboseErrorMessages: Boolean,
responseHeaderSizeHint: Int,
maxEncryptionChunkSize: Int,
defaultHostHeader: Host,
parserSettings: ParserSettings) {
require(reapingCycle >= Duration.Zero, "reapingCycle must be > 0 or 'infinite'")
require(0 <= pipeliningLimit && pipeliningLimit <= 128, "pipelining-limit must be >= 0 and <= 128")
require(0 <= responseHeaderSizeHint, "response-size-hint must be > 0")
require(0 < maxEncryptionChunkSize, "max-encryption-chunk-size must be > 0")
}
object ServerSettings extends SettingsCompanion[ServerSettings]("akka.http.server") {
final case class Timeouts(idleTimeout: Duration,
requestTimeout: Duration,
timeoutTimeout: Duration,
bindTimeout: Duration,
unbindTimeout: Duration,
parseErrorAbortTimeout: Duration) {
require(idleTimeout >= Duration.Zero, "idleTimeout must be > 0 or 'infinite'")
require(requestTimeout >= Duration.Zero, "requestTimeout must be > 0 or 'infinite'")
require(timeoutTimeout >= Duration.Zero, "timeoutTimeout must be > 0 or 'infinite'")
require(bindTimeout >= Duration.Zero, "bindTimeout must be > 0 or 'infinite'")
require(unbindTimeout >= Duration.Zero, "unbindTimeout must be > 0 or 'infinite'")
require(!requestTimeout.isFinite || idleTimeout > requestTimeout,
"idle-timeout must be > request-timeout (if the latter is not 'infinite')")
require(!idleTimeout.isFinite || idleTimeout > 1.second, // the current implementation is not fit for
"an idle-timeout < 1 second is not supported") // very short idle-timeout settings
bindTimeout: FiniteDuration) {
require(bindTimeout >= Duration.Zero, "bindTimeout must be > 0")
}
implicit def timeoutsShortcut(s: ServerSettings): Timeouts = s.timeouts
def fromSubConfig(c: Config) = apply(
c.getString("server-header").toOption.map(Server(_)),
c getBoolean "ssl-encryption",
c.getString("pipelining-limit") match { case "disabled" 0; case _ c getInt "pipelining-limit" },
Timeouts(
c getPotentiallyInfiniteDuration "idle-timeout",
c getPotentiallyInfiniteDuration "request-timeout",
c getPotentiallyInfiniteDuration "timeout-timeout",
c getPotentiallyInfiniteDuration "bind-timeout",
c getPotentiallyInfiniteDuration "unbind-timeout",
c getPotentiallyInfiniteDuration "parse-error-abort-timeout"),
c getString "timeout-handler",
c getPotentiallyInfiniteDuration "reaping-cycle",
c getFiniteDuration "bind-timeout"),
c getBoolean "remote-address-header",
c getBoolean "raw-request-uri-header",
c getBoolean "transparent-head-requests",
c getBoolean "verbose-error-messages",
c getIntBytes "response-header-size-hint",
c getIntBytes "max-encryption-chunk-size",
defaultHostHeader =
HeaderParser.parseHeader(RawHeader("Host", c getString "default-host-header")) match {
case Right(x: Host) x
@ -80,5 +52,8 @@ object ServerSettings extends SettingsCompanion[ServerSettings]("akka.http.serve
case Right(_) throw new IllegalStateException
},
ParserSettings fromSubConfig c.getConfig("parsing"))
def apply(optionalSettings: Option[ServerSettings])(implicit actorRefFactory: ActorRefFactory): ServerSettings =
optionalSettings getOrElse apply(actorSystem)
}

View file

@ -4,7 +4,7 @@
package akka.http.util
import scala.concurrent.duration.Duration
import scala.concurrent.duration.{ FiniteDuration, Duration }
import com.typesafe.config.Config
import akka.ConfigurationException
@ -18,6 +18,11 @@ private[http] class EnhancedConfig(val underlying: Config) extends AnyVal {
case x Duration(x)
}
def getFiniteDuration(path: String): FiniteDuration = Duration(underlying.getString(path)) match {
case x: FiniteDuration x
case _ throw new ConfigurationException(s"Config setting '$path' must be a finite duration")
}
def getPossiblyInfiniteInt(path: String): Int = underlying.getString(path) match {
case "infinite" Int.MaxValue
case x underlying.getInt(path)

View file

@ -132,7 +132,7 @@ private[http] object Renderer {
/**
* INTERNAL API
*
* The interface for a rendering sink. May be implemented with different backing stores.
* The interface for a rendering sink. Implemented for several serialization targets.
*/
private[http] trait Rendering {
def ~~(ch: Char): this.type

View file

@ -49,7 +49,7 @@ private[http] abstract class SettingsCompanion[T](prefix: String) {
object SettingsCompanion {
lazy val configAdditions: Config = {
val localHostName =
try InetAddress.getLocalHost.getHostName
try InetAddress.getLocalHost.getHostName // TODO: upgrade to `getHostString` once we are on JDK7
catch { case NonFatal(_) "" }
ConfigFactory.parseMap(Map("akka.http.hostname" -> localHostName).asJava)
}

View file

@ -5,10 +5,19 @@
package akka.http
import language.implicitConversions
import java.net.InetSocketAddress
import java.nio.channels.ServerSocketChannel
import java.nio.charset.Charset
import com.typesafe.config.Config
import org.reactivestreams.api.Producer
import akka.event.LoggingAdapter
import akka.util.ByteString
import akka.actor.{ ActorRefFactory, ActorContext, ActorSystem }
import akka.stream.scaladsl.Flow
import akka.stream.{ Transformer, FlattenStrategy, FlowMaterializer }
package object util {
private[http] val UTF8 = Charset.forName("UTF8")
private[http] def actorSystem(implicit refFactory: ActorRefFactory): ActorSystem =
refFactory match {
@ -20,5 +29,34 @@ package object util {
private[http] implicit def enhanceByteArray(array: Array[Byte]): EnhancedByteArray = new EnhancedByteArray(array)
private[http] implicit def enhanceConfig(config: Config): EnhancedConfig = new EnhancedConfig(config)
private[http] implicit def enhanceString_(s: String): EnhancedString = new EnhancedString(s)
private[http] implicit class FlowWithHeadAndTail[T](val underlying: Flow[Producer[T]]) extends AnyVal {
def headAndTail(materializer: FlowMaterializer): Flow[(T, Producer[T])] =
underlying.map { p
Flow(p).prefixAndTail(1).map { case (prefix, tail) (prefix.head, tail) }.toProducer(materializer)
}.flatten(FlattenStrategy.Concat())
}
private[http] implicit class FlowWithPrintEvent[T](val underlying: Flow[T]) {
def printEvent(marker: String): Flow[T] =
underlying.transform {
new Transformer[T, T] {
def onNext(element: T) = {
println(s"$marker: $element")
element :: Nil
}
override def onTermination(e: Option[Throwable]) = {
println(s"$marker: Terminated with error $e")
Nil
}
}
}
}
private[http] def errorLogger(log: LoggingAdapter, msg: String): Transformer[ByteString, ByteString] =
new Transformer[ByteString, ByteString] {
def onNext(element: ByteString) = element :: Nil
override def onError(cause: Throwable): Unit = log.error(cause, msg)
}
}

View file

@ -0,0 +1,2 @@
# override strange reference.conf setting in akka-stream test scope
akka.actor.default-mailbox.mailbox-type = akka.dispatch.UnboundedMailbox

View file

@ -0,0 +1,172 @@
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.http
import java.net.Socket
import java.io.{ InputStreamReader, BufferedReader, OutputStreamWriter, BufferedWriter }
import com.typesafe.config.{ ConfigFactory, Config }
import scala.annotation.tailrec
import scala.concurrent.duration._
import scala.concurrent.Await
import org.scalatest.{ BeforeAndAfterAll, Matchers, WordSpec }
import akka.actor.ActorSystem
import akka.testkit.TestProbe
import akka.io.IO
import akka.stream.{ FlowMaterializer, MaterializerSettings }
import akka.stream.testkit.{ ProducerProbe, ConsumerProbe, StreamTestKit }
import akka.stream.impl.SynchronousProducerFromIterable
import akka.stream.scaladsl.Flow
import akka.http.server.ServerSettings
import akka.http.client.ClientConnectionSettings
import akka.http.model._
import akka.http.util._
import headers._
import HttpMethods._
import HttpEntity._
import TestUtils._
class ClientServerSpec extends WordSpec with Matchers with BeforeAndAfterAll {
val testConf: Config = ConfigFactory.parseString("""
akka.event-handlers = ["akka.testkit.TestEventListener"]
akka.loglevel = WARNING""")
implicit val system = ActorSystem(getClass.getSimpleName, testConf)
import system.dispatcher
val materializerSettings = MaterializerSettings(dispatcher = "akka.test.stream-dispatcher")
val materializer = FlowMaterializer(materializerSettings)
"The server-side HTTP infrastructure" should {
"properly bind and unbind a server" in {
val (hostname, port) = temporaryServerHostnameAndPort()
val commander = TestProbe()
commander.send(IO(Http), Http.Bind(hostname, port, materializerSettings = materializerSettings))
val Http.ServerBinding(localAddress, connectionStream) = commander.expectMsgType[Http.ServerBinding]
localAddress.getHostName shouldEqual hostname
localAddress.getPort shouldEqual port
val c = StreamTestKit.consumerProbe[Http.IncomingConnection]
connectionStream.produceTo(c)
val sub = c.expectSubscription()
sub.cancel()
// TODO: verify unbinding effect
}
"properly complete a simple request/response cycle" in new TestSetup {
val (clientOut, clientIn) = openNewClientConnection[Symbol]()
val (serverIn, serverOut) = acceptConnection()
val clientOutSub = clientOut.expectSubscription()
clientOutSub.sendNext(HttpRequest(uri = "/abc") -> 'abcContext)
val serverInSub = serverIn.expectSubscription()
serverInSub.requestMore(1)
serverIn.expectNext().uri shouldEqual Uri(s"http://$hostname:$port/abc")
val serverOutSub = serverOut.expectSubscription()
serverOutSub.sendNext(HttpResponse(entity = "yeah"))
val clientInSub = clientIn.expectSubscription()
clientInSub.requestMore(1)
val (response, 'abcContext) = clientIn.expectNext()
toStrict(response.entity) shouldEqual HttpEntity("yeah")
}
"properly complete a chunked request/response cycle" in new TestSetup {
val (clientOut, clientIn) = openNewClientConnection[Long]()
val (serverIn, serverOut) = acceptConnection()
val chunks = List(Chunk("abc"), Chunk("defg"), Chunk("hijkl"), LastChunk)
val chunkedContentType: ContentType = MediaTypes.`application/base64`
val chunkedEntity = HttpEntity.Chunked(chunkedContentType, SynchronousProducerFromIterable(chunks))
val clientOutSub = clientOut.expectSubscription()
clientOutSub.sendNext(HttpRequest(POST, "/chunked", List(Accept(MediaRanges.`*/*`)), chunkedEntity) -> 12345678)
val serverInSub = serverIn.expectSubscription()
serverInSub.requestMore(1)
private val HttpRequest(POST, uri, List(`User-Agent`(_), Host(_, _), Accept(Vector(MediaRanges.`*/*`))),
Chunked(`chunkedContentType`, chunkStream), HttpProtocols.`HTTP/1.1`) = serverIn.expectNext()
uri shouldEqual Uri(s"http://$hostname:$port/chunked")
Await.result(Flow(chunkStream).grouped(4).toFuture(materializer), 100.millis) shouldEqual chunks
val serverOutSub = serverOut.expectSubscription()
serverOutSub.sendNext(HttpResponse(206, List(RawHeader("Age", "42")), chunkedEntity))
val clientInSub = clientIn.expectSubscription()
clientInSub.requestMore(1)
val (HttpResponse(StatusCodes.PartialContent, List(Date(_), Server(_), RawHeader("Age", "42")),
Chunked(`chunkedContentType`, chunkStream2), HttpProtocols.`HTTP/1.1`), 12345678) = clientIn.expectNext()
Await.result(Flow(chunkStream2).grouped(1000).toFuture(materializer), 100.millis) shouldEqual chunks
}
}
override def afterAll() = system.shutdown()
class TestSetup {
val (hostname, port) = temporaryServerHostnameAndPort()
val bindHandler = TestProbe()
def configOverrides = ""
// automatically bind a server
val connectionStream: ConsumerProbe[Http.IncomingConnection] = {
val commander = TestProbe()
val settings = configOverrides.toOption.map(ServerSettings.apply)
commander.send(IO(Http), Http.Bind(hostname, port, serverSettings = settings, materializerSettings = materializerSettings))
val probe = StreamTestKit.consumerProbe[Http.IncomingConnection]
commander.expectMsgType[Http.ServerBinding].connectionStream.produceTo(probe)
probe
}
val connectionStreamSub = connectionStream.expectSubscription()
def openNewClientConnection[T](settings: Option[ClientConnectionSettings] = None): (ProducerProbe[(HttpRequest, T)], ConsumerProbe[(HttpResponse, T)]) = {
val commander = TestProbe()
commander.send(IO(Http), Http.Connect(hostname, port, settings = settings, materializerSettings = materializerSettings))
val connection = commander.expectMsgType[Http.OutgoingConnection]
connection.remoteAddress.getPort shouldEqual port
connection.remoteAddress.getHostName shouldEqual hostname
val requestProducerProbe = StreamTestKit.producerProbe[(HttpRequest, T)]
val responseConsumerProbe = StreamTestKit.consumerProbe[(HttpResponse, T)]
requestProducerProbe.produceTo(connection.processor[T])
connection.processor[T].produceTo(responseConsumerProbe)
requestProducerProbe -> responseConsumerProbe
}
def acceptConnection(): (ConsumerProbe[HttpRequest], ProducerProbe[HttpResponse]) = {
connectionStreamSub.requestMore(1)
val Http.IncomingConnection(_, requestProducer, responseConsumer) = connectionStream.expectNext()
val requestConsumerProbe = StreamTestKit.consumerProbe[HttpRequest]
val responseProducerProbe = StreamTestKit.producerProbe[HttpResponse]
requestProducer.produceTo(requestConsumerProbe)
responseProducerProbe.produceTo(responseConsumer)
requestConsumerProbe -> responseProducerProbe
}
def openClientSocket() = new Socket(hostname, port)
def write(socket: Socket, data: String) = {
val writer = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream))
writer.write(data)
writer.flush()
writer
}
def readAll(socket: Socket)(reader: BufferedReader = new BufferedReader(new InputStreamReader(socket.getInputStream))): (String, BufferedReader) = {
val sb = new java.lang.StringBuilder
val cbuf = new Array[Char](256)
@tailrec def drain(): (String, BufferedReader) = reader.read(cbuf) match {
case -1 sb.toString -> reader
case n sb.append(cbuf, 0, n); drain()
}
drain()
}
}
def toStrict(entity: HttpEntity): HttpEntity.Strict =
Await.result(entity.toStrict(500.millis, materializer), 1.second)
}

View file

@ -0,0 +1,49 @@
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.http
import com.typesafe.config.{ ConfigFactory, Config }
import scala.concurrent.Future
import scala.util.{ Failure, Success }
import scala.concurrent.duration._
import akka.util.Timeout
import akka.stream.{ MaterializerSettings, FlowMaterializer }
import akka.stream.scaladsl.Flow
import akka.io.IO
import akka.actor.ActorSystem
import akka.pattern.ask
import akka.http.model._
import HttpMethods._
object TestClient extends App {
val testConf: Config = ConfigFactory.parseString("""
akka.loglevel = INFO
akka.log-dead-letters = off
""")
implicit val system = ActorSystem("ServerTest", testConf)
import system.dispatcher
val materializer = FlowMaterializer(MaterializerSettings())
implicit val askTimeout: Timeout = 500.millis
val host = "spray.io"
println(s"Fetching HTTP server version of host `$host` ...")
val result = for {
connection IO(Http).ask(Http.Connect(host)).mapTo[Http.OutgoingConnection]
response sendRequest(HttpRequest(GET, uri = "/"), connection)
} yield response.header[headers.Server]
def sendRequest(request: HttpRequest, connection: Http.OutgoingConnection): Future[HttpResponse] = {
Flow(List(HttpRequest() -> 'NoContext)).produceTo(materializer, connection.processor)
Flow(connection.processor).map(_._1).toFuture(materializer)
}
result onComplete {
case Success(res) println(s"$host is running ${res mkString ", "}")
case Failure(error) println(s"Error: $error")
}
result onComplete { _ system.shutdown() }
}

View file

@ -0,0 +1,24 @@
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.http
import java.net.InetSocketAddress
import java.nio.channels.ServerSocketChannel
object TestUtils {
def temporaryServerAddress(interface: String = "127.0.0.1"): InetSocketAddress = {
val serverSocket = ServerSocketChannel.open()
try {
serverSocket.socket.bind(new InetSocketAddress(interface, 0))
val port = serverSocket.socket.getLocalPort
new InetSocketAddress(interface, port)
} finally serverSocket.close()
}
def temporaryServerHostnameAndPort(interface: String = "127.0.0.1"): (String, Int) = {
val socketAddress = temporaryServerAddress(interface)
socketAddress.getHostName -> socketAddress.getPort
}
}

View file

@ -75,8 +75,8 @@ class RequestParserSpec extends FreeSpec with Matchers with BeforeAndAfterAll {
|Content-length: 17
|
|Shake your BOODY!""" should parseTo {
HttpRequest(POST, "/resource/yes", List(`Content-Length`(17), `Content-Type`(ContentTypes.`text/plain(UTF-8)`),
Connection("keep-alive"), `User-Agent`("curl/7.19.7 xyz")), "Shake your BOODY!", `HTTP/1.0`)
HttpRequest(POST, "/resource/yes", List(Connection("keep-alive"), `User-Agent`("curl/7.19.7 xyz")),
"Shake your BOODY!", `HTTP/1.0`)
}
closeAfterResponseCompletion shouldEqual Seq(false)
}
@ -90,9 +90,8 @@ class RequestParserSpec extends FreeSpec with Matchers with BeforeAndAfterAll {
|Shake your BOODY!GET / HTTP/1.0
|
|""" should parseTo(
HttpRequest(POST, "/resource/yes", List(`Content-Length`(17), Connection("keep-alive"),
`User-Agent`("curl/7.19.7 xyz")), HttpEntity(ContentTypes.`application/octet-stream`, "Shake your BOODY!"),
`HTTP/1.0`),
HttpRequest(POST, "/resource/yes", List(Connection("keep-alive"), `User-Agent`("curl/7.19.7 xyz")),
"Shake your BOODY!".getBytes, `HTTP/1.0`),
HttpRequest(protocol = `HTTP/1.0`))
closeAfterResponseCompletion shouldEqual Seq(false, true)
}
@ -121,8 +120,7 @@ class RequestParserSpec extends FreeSpec with Matchers with BeforeAndAfterAll {
|
|ABCDPATCH"""
}.toCharArray.map(_.toString).toSeq should rawMultiParseTo(
HttpRequest(PUT, "/resource/yes", List(Host("x"), `Content-Length`(4)),
HttpEntity(ContentTypes.`application/octet-stream`, "ABCD")))
HttpRequest(PUT, "/resource/yes", List(Host("x")), "ABCD".getBytes))
closeAfterResponseCompletion shouldEqual Seq(false)
}
@ -140,10 +138,7 @@ class RequestParserSpec extends FreeSpec with Matchers with BeforeAndAfterAll {
|Content-Type: application/pdf
|Content-Length: 0
|
|""" should parseTo {
HttpRequest(GET, "/data", List(`Content-Length`(0), `Content-Type`(`application/pdf`), Host("x")),
HttpEntity(`application/pdf`, ""))
}
|""" should parseTo(HttpRequest(GET, "/data", List(Host("x")), HttpEntity(`application/pdf`, "")))
closeAfterResponseCompletion shouldEqual Seq(false)
}
@ -156,8 +151,7 @@ class RequestParserSpec extends FreeSpec with Matchers with BeforeAndAfterAll {
|Host: ping
|
|"""
val baseRequest = HttpRequest(PATCH, "/data", List(Host("ping"), `Content-Type`(`application/pdf`),
Connection("lalelu"), `Transfer-Encoding`(TransferEncodings.chunked)))
val baseRequest = HttpRequest(PATCH, "/data", List(Host("ping"), Connection("lalelu")))
"request start" in new Test {
Seq(start, "rest") should generalMultiParseTo(
@ -219,8 +213,8 @@ class RequestParserSpec extends FreeSpec with Matchers with BeforeAndAfterAll {
|Host: ping
|
|"""
val baseRequest = HttpRequest(PATCH, "/data", List(Host("ping"), Connection("lalelu"),
`Transfer-Encoding`(TransferEncodings.chunked)), HttpEntity.Chunked(`application/octet-stream`, producer()))
val baseRequest = HttpRequest(PATCH, "/data", List(Host("ping"), Connection("lalelu")),
HttpEntity.Chunked(`application/octet-stream`, producer()))
"an illegal char after chunk size" in new Test {
Seq(start,
@ -331,20 +325,18 @@ class RequestParserSpec extends FreeSpec with Matchers with BeforeAndAfterAll {
override def afterAll() = system.shutdown()
private[http] class Test {
private class Test {
var closeAfterResponseCompletion = Seq.empty[Boolean]
def parseTo(expected: HttpRequest*): Matcher[String] =
multiParseTo(expected: _*).compose(_ :: Nil)
def multiParseTo(expected: HttpRequest*): Matcher[Seq[String]] = multiParseTo(newParser, expected: _*)
def multiParseTo(parser: HttpRequestParser, expected: HttpRequest*): Matcher[Seq[String]] =
rawMultiParseTo(parser, expected: _*).compose(_ map prep)
def rawMultiParseTo(expected: HttpRequest*): Matcher[Seq[String]] =
rawMultiParseTo(newParser, expected: _*)
def rawMultiParseTo(parser: HttpRequestParser, expected: HttpRequest*): Matcher[Seq[String]] =
generalRawMultiParseTo(parser, expected.map(Right(_)): _*)
@ -356,11 +348,9 @@ class RequestParserSpec extends FreeSpec with Matchers with BeforeAndAfterAll {
def generalRawMultiParseTo(expected: Either[ParseError, HttpRequest]*): Matcher[Seq[String]] =
generalRawMultiParseTo(newParser, expected: _*)
def generalRawMultiParseTo(parser: HttpRequestParser,
expected: Either[ParseError, HttpRequest]*): Matcher[Seq[String]] =
equal(expected).matcher[Seq[Either[ParseError, HttpRequest]]] compose { input: Seq[String]
import HttpServerPipeline._
val future =
Flow(input.toList)
.map(ByteString.apply)
@ -368,9 +358,9 @@ class RequestParserSpec extends FreeSpec with Matchers with BeforeAndAfterAll {
.splitWhen(_.isInstanceOf[ParserOutput.MessageStart])
.headAndTail(materializer)
.collect {
case (x: ParserOutput.RequestStart, entityParts)
closeAfterResponseCompletion :+= x.closeAfterResponseCompletion
Right(HttpServerPipeline.constructRequest(x, entityParts))
case (ParserOutput.RequestStart(method, uri, protocol, headers, createEntity, close), entityParts)
closeAfterResponseCompletion :+= close
Right(HttpRequest(method, uri, headers, createEntity(entityParts), protocol))
case (x: ParseError, _) Left(x)
}
.map { x

View file

@ -0,0 +1,260 @@
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.http.parsing
import com.typesafe.config.{ ConfigFactory, Config }
import scala.concurrent.{ Future, Await }
import scala.concurrent.duration._
import org.scalatest.{ Tag, BeforeAndAfterAll, FreeSpec, Matchers }
import org.scalatest.matchers.Matcher
import org.reactivestreams.api.Producer
import akka.stream.scaladsl.Flow
import akka.stream.impl.SynchronousProducerFromIterable
import akka.stream.{ FlattenStrategy, MaterializerSettings, FlowMaterializer }
import akka.util.ByteString
import akka.actor.ActorSystem
import akka.http.client.HttpClientPipeline
import akka.http.util._
import akka.http.model._
import headers._
import MediaTypes._
import HttpMethods._
import HttpProtocols._
import StatusCodes._
import HttpEntity._
import ParserOutput.ParseError
class ResponseParserSpec extends FreeSpec with Matchers with BeforeAndAfterAll {
val testConf: Config = ConfigFactory.parseString("""
akka.event-handlers = ["akka.testkit.TestEventListener"]
akka.loglevel = WARNING
akka.http.parsing.max-response-reason-length = 21""")
implicit val system = ActorSystem(getClass.getSimpleName, testConf)
import system.dispatcher
val materializer = FlowMaterializer(MaterializerSettings())
val ServerOnTheMove = StatusCodes.registerCustom(331, "Server on the move")
"The response parsing logic should" - {
"properly parse" - {
// http://tools.ietf.org/html/rfc7230#section-3.3.3
"a 200 response to a HEAD request" in new Test {
"""HTTP/1.1 200 OK
|
|HTT""" should parseTo(HEAD, HttpResponse())
closeAfterResponseCompletion shouldEqual Seq(false)
}
// http://tools.ietf.org/html/rfc7230#section-3.3.3
"a 204 response" in new Test {
"""HTTP/1.1 204 OK
|
|""" should parseTo(HttpResponse(NoContent))
closeAfterResponseCompletion shouldEqual Seq(false)
}
"a response with a custom status code" in new Test {
"""HTTP/1.1 331 Server on the move
|Content-Length: 0
|
|""" should parseTo(HttpResponse(ServerOnTheMove))
closeAfterResponseCompletion shouldEqual Seq(false)
}
"a response with one header, a body, but no Content-Length header" in new Test {
"""HTTP/1.0 404 Not Found
|Host: api.example.com
|
|Foobs""" should parseTo(HttpResponse(NotFound, List(Host("api.example.com")), "Foobs".getBytes, `HTTP/1.0`))
closeAfterResponseCompletion shouldEqual Seq(true)
}
"a response with one header, no body, and no Content-Length header" in new Test {
"""HTTP/1.0 404 Not Found
|Host: api.example.com
|
|""" should parseTo(HttpResponse(NotFound, List(Host("api.example.com")),
HttpEntity.empty(ContentTypes.`application/octet-stream`), `HTTP/1.0`))
closeAfterResponseCompletion shouldEqual Seq(true)
}
"a response with 3 headers, a body and remaining content" in new Test {
"""HTTP/1.1 500 Internal Server Error
|User-Agent: curl/7.19.7 xyz
|Connection:close
|Content-Length: 17
|Content-Type: text/plain; charset=UTF-8
|
|Shake your BOODY!HTTP/1.""" should parseTo(HttpResponse(InternalServerError, List(Connection("close"),
`User-Agent`("curl/7.19.7 xyz")), "Shake your BOODY!"))
closeAfterResponseCompletion shouldEqual Seq(true)
}
"a split response (parsed byte-by-byte)" in new Test {
prep {
"""HTTP/1.1 200 Ok
|Content-Length: 4
|
|ABCD"""
}.toCharArray.map(_.toString).toSeq should rawMultiParseTo(HttpResponse(entity = "ABCD".getBytes))
closeAfterResponseCompletion shouldEqual Seq(false)
}
}
"properly parse a chunked" - {
val start =
"""HTTP/1.1 200 OK
|Transfer-Encoding: chunked
|Connection: lalelu
|Content-Type: application/pdf
|Server: spray-can
|
|"""
val baseResponse = HttpResponse(headers = List(Server("spray-can"), Connection("lalelu")))
"response start" in new Test {
Seq(start, "rest") should generalMultiParseTo(
Right(baseResponse.withEntity(HttpEntity.Chunked(`application/pdf`, producer()))),
Left("Illegal character 'r' in chunk start"))
closeAfterResponseCompletion shouldEqual Seq(false)
}
"message chunk with and without extension" in new Test {
Seq(start +
"""3
|abc
|10;some=stuff;bla
|0123456789ABCDEF
|""",
"10;foo=",
"""bar
|0123456789ABCDEF
|10
|0123456789""",
"""ABCDEF
|dead""") should generalMultiParseTo(
Right(baseResponse.withEntity(HttpEntity.Chunked(`application/pdf`, producer(
HttpEntity.Chunk(ByteString("abc")),
HttpEntity.Chunk(ByteString("0123456789ABCDEF"), "some=stuff;bla"),
HttpEntity.Chunk(ByteString("0123456789ABCDEF"), "foo=bar"),
HttpEntity.Chunk(ByteString("0123456789ABCDEF"), ""))))))
closeAfterResponseCompletion shouldEqual Seq(false)
}
"message end" in new Test {
Seq(start,
"""0
|
|""") should generalMultiParseTo(
Right(baseResponse.withEntity(HttpEntity.Chunked(`application/pdf`, producer(HttpEntity.LastChunk)))))
closeAfterResponseCompletion shouldEqual Seq(false)
}
"message end with extension, trailer and remaining content" in new Test {
Seq(start,
"""000;nice=true
|Foo: pip
| apo
|Bar: xyz
|
|HT""") should generalMultiParseTo(
Right(baseResponse.withEntity(HttpEntity.Chunked(`application/pdf`,
producer(HttpEntity.LastChunk("nice=true", List(RawHeader("Bar", "xyz"), RawHeader("Foo", "pip apo"))))))))
closeAfterResponseCompletion shouldEqual Seq(false)
}
}
"reject a response with" - {
"HTTP version 1.2" in new Test {
Seq("HTTP/1.2 200 OK\r\n") should generalMultiParseTo(Left("The server-side HTTP version is not supported"))
}
"an illegal status code" in new Test {
Seq("HTTP/1", ".1 2000 Something") should generalMultiParseTo(Left("Illegal response status code"))
}
"a too-long response status reason" in new Test {
Seq("HTTP/1.1 204 12345678", "90123456789012\r\n") should generalMultiParseTo(
Left("Response reason phrase exceeds the configured limit of 21 characters"))
}
}
}
override def afterAll() = system.shutdown()
private class Test {
var closeAfterResponseCompletion = Seq.empty[Boolean]
def parseTo(expected: HttpResponse*): Matcher[String] = parseTo(GET, expected: _*)
def parseTo(requestMethod: HttpMethod, expected: HttpResponse*): Matcher[String] =
multiParseTo(requestMethod, expected: _*).compose(_ :: Nil)
def multiParseTo(expected: HttpResponse*): Matcher[Seq[String]] = multiParseTo(GET, expected: _*)
def multiParseTo(requestMethod: HttpMethod, expected: HttpResponse*): Matcher[Seq[String]] =
rawMultiParseTo(requestMethod, expected: _*).compose(_ map prep)
def rawMultiParseTo(expected: HttpResponse*): Matcher[Seq[String]] = rawMultiParseTo(GET, expected: _*)
def rawMultiParseTo(requestMethod: HttpMethod, expected: HttpResponse*): Matcher[Seq[String]] =
generalRawMultiParseTo(requestMethod, expected.map(Right(_)): _*)
def parseToError(error: String): Matcher[String] = generalMultiParseTo(Left(error)).compose(_ :: Nil)
def generalMultiParseTo(expected: Either[String, HttpResponse]*): Matcher[Seq[String]] =
generalRawMultiParseTo(expected: _*).compose(_ map prep)
def generalRawMultiParseTo(expected: Either[String, HttpResponse]*): Matcher[Seq[String]] =
generalRawMultiParseTo(GET, expected: _*)
def generalRawMultiParseTo(requestMethod: HttpMethod, expected: Either[String, HttpResponse]*): Matcher[Seq[String]] =
equal(expected).matcher[Seq[Either[String, HttpResponse]]] compose {
input: Seq[String]
val future =
Flow(input.toList)
.map(ByteString.apply)
.transform(newParser(requestMethod))
.splitWhen(_.isInstanceOf[ParserOutput.MessageStart])
.headAndTail(materializer)
.collect {
case (ParserOutput.ResponseStart(statusCode, protocol, headers, createEntity, close), entityParts)
closeAfterResponseCompletion :+= close
Right(HttpResponse(statusCode, headers, createEntity(entityParts), protocol))
case (x: ParseError, _) Left(x)
}.map { x
Flow {
x match {
case Right(response) compactEntity(response.entity).map(x Right(response.withEntity(x)))
case Left(error) Future.successful(Left(error.info.formatPretty))
}
}.toProducer(materializer)
}
.flatten(FlattenStrategy.concat)
.grouped(1000).toFuture(materializer)
Await.result(future, 250.millis)
}
def newParser(requestMethod: HttpMethod = GET) = {
val parser = new HttpResponseParser(ParserSettings(system), materializer,
dequeueRequestMethodForNextResponse = () requestMethod)()
parser
}
private def compactEntity(entity: HttpEntity): Future[HttpEntity] =
entity match {
case x: HttpEntity.Chunked compactEntityChunks(x.chunks).map(compacted x.copy(chunks = compacted))
case _ entity.toStrict(250.millis, materializer)
}
private def compactEntityChunks(data: Producer[ChunkStreamPart]): Future[Producer[ChunkStreamPart]] =
Flow(data).grouped(1000).toFuture(materializer)
.map(producer(_: _*))
.recover {
case _: NoSuchElementException producer[ChunkStreamPart]()
}
def prep(response: String) = response.stripMarginWithNewline("\r\n")
def producer[T](elems: T*): Producer[T] = SynchronousProducerFromIterable(elems.toList)
}
}

View file

@ -0,0 +1,199 @@
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.http.rendering
import com.typesafe.config.{ Config, ConfigFactory }
import java.net.InetSocketAddress
import scala.concurrent.duration._
import scala.concurrent.Await
import org.scalatest.{ FreeSpec, Matchers, BeforeAndAfterAll }
import org.scalatest.matchers.Matcher
import akka.actor.ActorSystem
import akka.event.NoLogging
import akka.http.model._
import akka.http.model.headers._
import akka.http.util._
import akka.stream.scaladsl.Flow
import akka.stream.{ MaterializerSettings, FlowMaterializer }
import akka.stream.impl.SynchronousProducerFromIterable
import HttpEntity._
import HttpMethods._
class RequestRendererSpec extends FreeSpec with Matchers with BeforeAndAfterAll {
val testConf: Config = ConfigFactory.parseString("""
akka.event-handlers = ["akka.testkit.TestEventListener"]
akka.loglevel = WARNING""")
implicit val system = ActorSystem(getClass.getSimpleName, testConf)
import system.dispatcher
val materializer = FlowMaterializer(MaterializerSettings())
"The request preparation logic should" - {
"properly render an unchunked" - {
"GET request without headers and without body" in new TestSetup() {
HttpRequest(GET, "/abc") should renderTo {
"""GET /abc HTTP/1.1
|Host: test.com:8080
|User-Agent: spray-can/1.0.0
|
|"""
}
}
"GET request with a URI that requires encoding" in new TestSetup() {
HttpRequest(GET, "/abc<def") should renderTo {
"""GET /abc%3Cdef HTTP/1.1
|Host: test.com:8080
|User-Agent: spray-can/1.0.0
|
|"""
}
}
"POST request, a few headers (incl. a custom Host header) and no body" in new TestSetup() {
HttpRequest(POST, "/abc/xyz", List(
RawHeader("X-Fancy", "naa"),
RawHeader("Age", "0"),
Host("spray.io", 9999))) should renderTo {
"""POST /abc/xyz HTTP/1.1
|X-Fancy: naa
|Age: 0
|Host: spray.io:9999
|User-Agent: spray-can/1.0.0
|Content-Length: 0
|
|"""
}
}
"PUT request, a few headers and a body" in new TestSetup() {
HttpRequest(PUT, "/abc/xyz", List(
RawHeader("X-Fancy", "naa"),
RawHeader("Cache-Control", "public"),
Host("spray.io"))).withEntity("The content please!") should renderTo {
"""PUT /abc/xyz HTTP/1.1
|X-Fancy: naa
|Cache-Control: public
|Host: spray.io
|User-Agent: spray-can/1.0.0
|Content-Type: text/plain; charset=UTF-8
|Content-Length: 19
|
|The content please!"""
}
}
"PUT request, a few headers and a body with suppressed content type" in new TestSetup() {
HttpRequest(PUT, "/abc/xyz", List(
RawHeader("X-Fancy", "naa"),
RawHeader("Cache-Control", "public"),
Host("spray.io"))).withEntity(HttpEntity(ContentTypes.NoContentType, "The content please!")) should renderTo {
"""PUT /abc/xyz HTTP/1.1
|X-Fancy: naa
|Cache-Control: public
|Host: spray.io
|User-Agent: spray-can/1.0.0
|Content-Length: 19
|
|The content please!"""
}
}
}
"proper render a chunked" - {
"PUT request with empty chunk stream and custom Content-Type" in new TestSetup() {
HttpRequest(PUT, "/abc/xyz").withEntity(Chunked(ContentTypes.`text/plain`, producer())) should renderTo {
"""PUT /abc/xyz HTTP/1.1
|Host: test.com:8080
|User-Agent: spray-can/1.0.0
|Content-Type: text/plain
|Content-Length: 0
|
|"""
}
}
"POST request with body" in new TestSetup() {
HttpRequest(POST, "/abc/xyz")
.withEntity(Chunked(ContentTypes.`text/plain`, producer("XXXX", "ABCDEFGHIJKLMNOPQRSTUVWXYZ"))) should renderTo {
"""POST /abc/xyz HTTP/1.1
|Host: test.com:8080
|User-Agent: spray-can/1.0.0
|Content-Type: text/plain
|Transfer-Encoding: chunked
|
|4
|XXXX
|1a
|ABCDEFGHIJKLMNOPQRSTUVWXYZ
|0
|
|"""
}
}
}
"properly handle the User-Agent header" - {
"if no default is set and no explicit User-Agent header given" in new TestSetup(None) {
HttpRequest(GET, "/abc") should renderTo {
"""GET /abc HTTP/1.1
|Host: test.com:8080
|
|"""
}
}
"if a default is set but an explicit User-Agent header given" in new TestSetup() {
HttpRequest(GET, "/abc", List(`User-Agent`("user-ua/1.0"))) should renderTo {
"""GET /abc HTTP/1.1
|User-Agent: user-ua/1.0
|Host: test.com:8080
|
|"""
}
}
}
"properly use URI from Raw-Request-URI header if present" - {
"GET request with Raw-Request-URI" in new TestSetup() {
HttpRequest(GET, "/abc", List(`Raw-Request-URI`("/def"))) should renderTo {
"""GET /def HTTP/1.1
|Host: test.com:8080
|User-Agent: spray-can/1.0.0
|
|"""
}
}
"GET request with Raw-Request-URI sends raw URI even with invalid utf8 characters" in new TestSetup() {
HttpRequest(GET, "/abc", List(`Raw-Request-URI`("/def%80%fe%ff"))) should renderTo {
"""GET /def%80%fe%ff HTTP/1.1
|Host: test.com:8080
|User-Agent: spray-can/1.0.0
|
|"""
}
}
}
}
override def afterAll() = system.shutdown()
class TestSetup(val userAgent: Option[`User-Agent`] = Some(`User-Agent`("spray-can/1.0.0")),
serverAddress: InetSocketAddress = new InetSocketAddress("test.com", 8080))
extends HttpRequestRendererFactory(userAgent, requestHeaderSizeHint = 64, materializer, NoLogging) {
def renderTo(expected: String): Matcher[HttpRequest] =
equal(expected.stripMarginWithNewline("\r\n")).matcher[String] compose { request
val renderer = newRenderer
val byteStringProducer :: Nil = renderer.onNext(RequestRenderingContext(request, serverAddress))
val future = Flow(byteStringProducer).grouped(1000).toFuture(materializer).map(_.reduceLeft(_ ++ _).utf8String)
Await.result(future, 250.millis)
}
}
def producer[T](elems: T*) = SynchronousProducerFromIterable(elems.toList)
}

View file

@ -46,10 +46,10 @@ class ResponseRendererSpec extends FreeSpec with Matchers with BeforeAndAfterAll
"with status 304, a few headers and no body" in new TestSetup() {
HttpResponse(304, List(RawHeader("X-Fancy", "of course"), RawHeader("Age", "0"))) should renderTo {
"""HTTP/1.1 304 Not Modified
|Server: akka-http/1.0.0
|Date: Thu, 25 Aug 2011 09:10:29 GMT
|X-Fancy: of course
|Age: 0
|Server: akka-http/1.0.0
|Date: Thu, 25 Aug 2011 09:10:29 GMT
|Content-Length: 0
|
|"""
@ -73,9 +73,9 @@ class ResponseRendererSpec extends FreeSpec with Matchers with BeforeAndAfterAll
headers = List(RawHeader("Age", "30"), Connection("Keep-Alive")),
entity = "Small f*ck up overhere!")) should renderTo(
"""HTTP/1.1 200 OK
|Age: 30
|Server: akka-http/1.0.0
|Date: Thu, 25 Aug 2011 09:10:29 GMT
|Age: 30
|Content-Type: text/plain; charset=UTF-8
|Content-Length: 23
|
@ -87,9 +87,9 @@ class ResponseRendererSpec extends FreeSpec with Matchers with BeforeAndAfterAll
"with status 400, a few headers and a body" in new TestSetup() {
HttpResponse(400, List(RawHeader("Age", "30"), Connection("Keep-Alive")), "Small f*ck up overhere!") should renderTo {
"""HTTP/1.1 400 Bad Request
|Age: 30
|Server: akka-http/1.0.0
|Date: Thu, 25 Aug 2011 09:10:29 GMT
|Age: 30
|Content-Type: text/plain; charset=UTF-8
|Content-Length: 23
|
@ -101,9 +101,9 @@ class ResponseRendererSpec extends FreeSpec with Matchers with BeforeAndAfterAll
HttpResponse(400, List(RawHeader("Age", "30"), Connection("Keep-Alive")),
HttpEntity(contentType = ContentTypes.NoContentType, "Small f*ck up overhere!")) should renderTo {
"""HTTP/1.1 400 Bad Request
|Age: 30
|Server: akka-http/1.0.0
|Date: Thu, 25 Aug 2011 09:10:29 GMT
|Age: 30
|Content-Length: 23
|
|Small f*ck up overhere!"""
@ -115,9 +115,9 @@ class ResponseRendererSpec extends FreeSpec with Matchers with BeforeAndAfterAll
HttpResponse(400, List(RawHeader("Age", "30"), Connection("Keep-Alive")),
entity = Default(contentType = ContentTypes.`text/plain(UTF-8)`, 23, producer(ByteString("Small f*ck up overhere!")))) should renderTo {
"""HTTP/1.1 400 Bad Request
|Age: 30
|Server: akka-http/1.0.0
|Date: Thu, 25 Aug 2011 09:10:29 GMT
|Age: 30
|Content-Type: text/plain; charset=UTF-8
|Content-Length: 23
|
@ -128,14 +128,14 @@ class ResponseRendererSpec extends FreeSpec with Matchers with BeforeAndAfterAll
the[RuntimeException] thrownBy {
HttpResponse(200, entity = Default(ContentTypes.`application/json`, 10,
producer(ByteString("body123")))) should renderTo("")
} should have message "Response had declared Content-Length 10 but entity chunk stream amounts to 3 bytes less"
} should have message "HTTP message had declared Content-Length 10 but entity chunk stream amounts to 3 bytes less"
}
"with one chunk and incorrect (too small) Content-Length" in new TestSetup() {
the[RuntimeException] thrownBy {
HttpResponse(200, entity = Default(ContentTypes.`application/json`, 5,
producer(ByteString("body123")))) should renderTo("")
} should have message "Response had declared Content-Length 5 but entity chunk stream amounts to more bytes"
} should have message "HTTP message had declared Content-Length 5 but entity chunk stream amounts to more bytes"
}
}
@ -169,11 +169,24 @@ class ResponseRendererSpec extends FreeSpec with Matchers with BeforeAndAfterAll
"a chunked response" - {
"with empty entity" in new TestSetup() {
HttpResponse(200, List(RawHeader("Age", "30")),
Chunked(ContentTypes.`application/json`, producer())) should renderTo {
Chunked(ContentTypes.NoContentType, producer())) should renderTo {
"""HTTP/1.1 200 OK
|Age: 30
|Server: akka-http/1.0.0
|Date: Thu, 25 Aug 2011 09:10:29 GMT
|
|"""
}
}
"with empty entity but non-default Content-Type" in new TestSetup() {
HttpResponse(200, List(RawHeader("Age", "30")),
Chunked(ContentTypes.`application/json`, producer())) should renderTo {
"""HTTP/1.1 200 OK
|Age: 30
|Server: akka-http/1.0.0
|Date: Thu, 25 Aug 2011 09:10:29 GMT
|Content-Type: application/json; charset=UTF-8
|
|"""
}
@ -181,7 +194,7 @@ class ResponseRendererSpec extends FreeSpec with Matchers with BeforeAndAfterAll
"with one chunk and no explicit LastChunk" in new TestSetup() {
HttpResponse(entity = Chunked(ContentTypes.`text/plain(UTF-8)`,
producer(Chunk(ByteString("Yahoooo"))))) should renderTo {
producer("Yahoooo"))) should renderTo {
"""HTTP/1.1 200 OK
|Server: akka-http/1.0.0
|Date: Thu, 25 Aug 2011 09:10:29 GMT
@ -222,7 +235,7 @@ class ResponseRendererSpec extends FreeSpec with Matchers with BeforeAndAfterAll
ResponseRenderingContext(
requestProtocol = HttpProtocols.`HTTP/1.0`,
response = HttpResponse(entity = Chunked(ContentTypes.`application/json`,
producer(Chunk(ByteString("abc")), Chunk(ByteString("defg")))))) should renderTo(
producer(Chunk("abc"), Chunk("defg"))))) should renderTo(
"""HTTP/1.1 200 OK
|Server: akka-http/1.0.0
|Date: Thu, 25 Aug 2011 09:10:29 GMT
@ -246,6 +259,28 @@ class ResponseRendererSpec extends FreeSpec with Matchers with BeforeAndAfterAll
}
}
"properly handle the Server header" - {
"if no default is set and no explicit Server header given" in new TestSetup(None) {
HttpResponse(200) should renderTo {
"""HTTP/1.1 200 OK
|Date: Thu, 25 Aug 2011 09:10:29 GMT
|Content-Length: 0
|
|"""
}
}
"if a default is set but an explicit Server header given" in new TestSetup() {
HttpResponse(200, List(Server("server/1.0"))) should renderTo {
"""HTTP/1.1 200 OK
|Server: server/1.0
|Date: Thu, 25 Aug 2011 09:10:29 GMT
|Content-Length: 0
|
|"""
}
}
}
"The 'Connection' header should be rendered correctly" in new TestSetup() {
import org.scalatest.prop.TableDrivenPropertyChecks._
import HttpProtocols._
@ -291,18 +326,17 @@ class ResponseRendererSpec extends FreeSpec with Matchers with BeforeAndAfterAll
override def afterAll() = system.shutdown()
class TestSetup(val serverHeaderValue: String = "akka-http/1.0.0",
class TestSetup(val serverHeader: Option[Server] = Some(Server("akka-http/1.0.0")),
val transparentHeadRequests: Boolean = true)
extends HttpResponseRendererFactory(serverHeaderValue.toOption.map(Server(_)),
responseHeaderSizeHint = 64, materializer, NoLogging) {
extends HttpResponseRendererFactory(serverHeader, responseHeaderSizeHint = 64, materializer, NoLogging) {
def renderTo(expected: String): Matcher[HttpResponse] =
renderTo(expected, close = false) compose (ResponseRenderingContext(_))
def renderTo(expected: String, close: Boolean): Matcher[ResponseRenderingContext] =
equal(expected.stripMarginWithNewline("\r\n") -> close).matcher[(String, Boolean)] compose { input
equal(expected.stripMarginWithNewline("\r\n") -> close).matcher[(String, Boolean)] compose { ctx
val renderer = newRenderer
val byteStringProducer :: Nil = renderer.onNext(input)
val byteStringProducer :: Nil = renderer.onNext(ctx)
val future = Flow(byteStringProducer).grouped(1000).toFuture(materializer).map(_.reduceLeft(_ ++ _).utf8String)
Await.result(future, 250.millis) -> renderer.isComplete
}