diff --git a/akka-http-core/src/main/resources/reference.conf b/akka-http-core/src/main/resources/reference.conf index 775a5f5b20..adf6a0d272 100644 --- a/akka-http-core/src/main/resources/reference.conf +++ b/akka-http-core/src/main/resources/reference.conf @@ -8,72 +8,20 @@ akka.http { server { - # The value of the `Server` header to produce. - # Set to the empty string to disable rendering of the server header. + # The default value of the `Server` header to produce if no + # explicit `Server`-header was included in a response. + # If this value is the empty string and no header was included in + # the request, no `Server` header will be rendered at all. server-header = akka-http/${akka.version} - # Enables/disables SSL encryption. - # If enabled the server uses the implicit `ServerSSLEngineProvider` member - # of the `Bind` command to create `SSLEngine` instances for the underlying - # IO connection. - ssl-encryption = off - - # The maximum number of requests that are accepted (and dispatched to - # the application) on one single connection before the first request - # has to be completed. - # Incoming requests that would cause the pipelining limit to be exceeded - # are not read from the connections socket so as to build up "back-pressure" - # to the client via TCP flow control. - # A setting of 1 disables HTTP pipelining, since only one request per - # connection can be "open" (i.e. being processed by the application) at any - # time. Set to higher values to enable HTTP pipelining. - # Set to 'disabled' for completely disabling pipelining limits - # (not recommended on public-facing servers due to risk of DoS attacks). - # This value must be > 0 and <= 128. - pipelining-limit = 1 - # The time after which an idle connection will be automatically closed. # Set to `infinite` to completely disable idle connection timeouts. idle-timeout = 60 s - # If a request hasn't been responded to after the time period set here - # a `akka.http.Timedout` message will be sent to the timeout handler. - # Set to `infinite` to completely disable request timeouts. - request-timeout = 20 s - - # After a `Timedout` message has been sent to the timeout handler and the - # request still hasn't been completed after the time period set here - # the server will complete the request itself with an error response. - # Set to `infinite` to disable timeout timeouts. - timeout-timeout = 2 s - - # The path of the actor to send `akka.http.Timedout` messages to. - # If empty all `Timedout` messages will go to the "regular" request - # handling actor. - timeout-handler = "" - # The time period within which the TCP binding process must be completed. # Set to `infinite` to disable. bind-timeout = 1s - # The time period within which the TCP unbinding process must be completed. - # Set to `infinite` to disable. - unbind-timeout = 1s - - # The time after which a connection is aborted (RST) after a parsing error - # occurred. The timeout prevents a connection which is already known to be - # erroneous from receiving evermore data even if all of the data will be ignored. - # However, in case of a connection abortion the client usually doesn't properly - # receive the error response. This timeout is a trade-off which allows the client - # some time to finish its request and receive a proper error response before the - # connection is forcibly closed to free resources. - parse-error-abort-timeout = 2s - - # The "granularity" of timeout checking for both idle connections timeouts - # as well as request timeouts, should rarely be needed to modify. - # If set to `infinite` request and connection timeout checking is disabled. - reaping-cycle = 250 ms - # Enables/disables the addition of a `Remote-Address` header # holding the clients (remote) IP address. remote-address-header = off @@ -103,12 +51,6 @@ akka.http { # doesn't have to be fiddled with in most applications. response-header-size-hint = 512 - # For HTTPS connections this setting specifies the maximum number of - # bytes that are encrypted in one go. Large responses are broken down in - # chunks of this size so as to already begin sending before the response has - # been encrypted entirely. - max-encryption-chunk-size = 1m - # If this setting is empty the server only accepts requests that carry a # non-empty `Host` header. Otherwise it responds with `400 Bad Request`. # Set to a non-empty value to be used in lieu of a missing or empty `Host` @@ -131,45 +73,17 @@ akka.http { user-agent-header = akka-http/${akka.version} # The time period within which the TCP connecting process must be completed. - # Set to `infinite` to disable. connecting-timeout = 10s # The time after which an idle connection will be automatically closed. # Set to `infinite` to completely disable idle timeouts. idle-timeout = 60 s - # The max time period that a client connection will be waiting for a response - # before triggering a request timeout. The timer for this logic is not started - # until the connection is actually in a state to receive the response, which - # may be quite some time after the request has been received from the - # application! - # There are two main reasons to delay the start of the request timeout timer: - # 1. On the host-level API with pipelining disabled: - # If the request cannot be sent immediately because all connections are - # currently busy with earlier requests it has to be queued until a - # connection becomes available. - # 2. With pipelining enabled: - # The request timeout timer starts only once the response for the - # preceding request on the connection has arrived. - # Set to `infinite` to completely disable request timeouts. - request-timeout = 20 s - - # the "granularity" of timeout checking for both idle connections timeouts - # as well as request timeouts, should rarely be needed to modify. - # If set to `infinite` request and connection timeout checking is disabled. - reaping-cycle = 250 ms - # The initial size of the buffer to render the request headers in. # Can be used for fine-tuning request rendering performance but probably # doesn't have to be fiddled with in most applications. request-header-size-hint = 512 - # For HTTPS connections this setting specified the maximum number of - # bytes that are encrypted in one go. Large requests are broken down in - # chunks of this size so as to already begin sending before the request has - # been encrypted entirely. - max-encryption-chunk-size = 1m - # The proxy configurations to be used for requests with the specified # scheme. proxy { @@ -193,37 +107,6 @@ akka.http { parsing = ${akka.http.parsing} } - host-connector { - # The maximum number of parallel connections that an `HttpHostConnector` - # is allowed to establish to a host. Must be > 0. - max-connections = 4 - - # The maximum number of times an `HttpHostConnector` attempts to repeat - # failed requests (if the request can be safely retried) before - # giving up and returning an error. - max-retries = 5 - - # Configures redirection following. - # If set to zero redirection responses will not be followed, i.e. they'll be returned to the user as is. - # If set to a value > zero redirection responses will be followed up to the given number of times. - # If the redirection chain is longer than the configured value the first redirection response that is - # is not followed anymore is returned to the user as is. - max-redirects = 0 - - # If this setting is enabled, the `HttpHostConnector` pipelines requests - # across connections, otherwise only one single request can be "open" - # on a particular HTTP connection. - pipelining = off - - # The time after which an idle `HttpHostConnector` (without open - # connections) will automatically terminate itself. - # Set to `infinite` to completely disable idle timeouts. - idle-timeout = 30 s - - # Modify to tweak client settings for this host-connector only. - client = ${akka.http.client} - } - # The (default) configuration of the HTTP message parser for the server and the client. # IMPORTANT: These settings (i.e. children of `akka.http.parsing`) can't be directly # overridden in `application.conf` to change the parser settings for client and server @@ -265,11 +148,6 @@ akka.http { # provided as a `RawHeader`. illegal-header-warnings = on - # Enables/disables inclusion of an SSL-Session-Info header in parsed - # messages over SSL transports (i.e., HttpRequest on server side and - # HttpResponse on client side). - ssl-session-info-header = off - # limits for the number of different values per header type that the # header cache will hold header-cache { @@ -288,20 +166,4 @@ akka.http { # Fully qualified config path which holds the dispatcher configuration # to be used for the HttpManager. manager-dispatcher = "akka.actor.default-dispatcher" - - # Fully qualified config path which holds the dispatcher configuration - # to be used for the HttpClientSettingsGroup actors. - settings-group-dispatcher = "akka.actor.default-dispatcher" - - # Fully qualified config path which holds the dispatcher configuration - # to be used for the HttpHostConnector actors. - host-connector-dispatcher = "akka.actor.default-dispatcher" - - # Fully qualified config path which holds the dispatcher configuration - # to be used for HttpListener actors. - listener-dispatcher = "akka.actor.default-dispatcher" - - # Fully qualified config path which holds the dispatcher configuration - # to be used for HttpServerConnection and HttpClientConnection actors. - connection-dispatcher = "akka.actor.default-dispatcher" -} +} \ No newline at end of file diff --git a/akka-http-core/src/main/scala/akka/http/Http.scala b/akka-http-core/src/main/scala/akka/http/Http.scala index 6bf093a318..ac41267a26 100644 --- a/akka-http-core/src/main/scala/akka/http/Http.scala +++ b/akka-http-core/src/main/scala/akka/http/Http.scala @@ -7,13 +7,12 @@ package akka.http import java.net.InetSocketAddress import com.typesafe.config.Config import org.reactivestreams.api.{ Producer, Consumer } -import scala.concurrent.duration.Duration import scala.collection.immutable -import akka.io.{ Inet, Tcp } +import akka.io.Inet import akka.stream.MaterializerSettings -import akka.http.client.{ HostConnectorSettings, ClientConnectionSettings } +import akka.http.client.{ HttpClientProcessor, ClientConnectionSettings } import akka.http.server.ServerSettings -import akka.http.model.{ HttpResponse, HttpRequest, HttpHeader } +import akka.http.model.{ HttpResponse, HttpRequest } import akka.http.util._ import akka.actor._ @@ -22,63 +21,81 @@ object Http extends ExtensionKey[HttpExt] { /** * Command that can be sent to `IO(Http)` to trigger the setup of an HTTP client facility at * a certain API level (connection, host or request). - * The HTTP layer will respond with an `OutgoingHttpChannelInfo` reply (or `Status.Failure`). + * The HTTP layer will respond with an `Http.OutgoingChannel` reply (or `Status.Failure`). * The sender `ActorRef`of this response can then be sent `HttpRequest` instances to which * it will respond with `HttpResponse` instances (or `Status.Failure`). */ - sealed trait OutgoingHttpChannelSetup + sealed trait SetupOutgoingChannel final case class Connect(remoteAddress: InetSocketAddress, - sslEncryption: Boolean, localAddress: Option[InetSocketAddress], options: immutable.Traversable[Inet.SocketOption], - settings: Option[ClientConnectionSettings]) extends OutgoingHttpChannelSetup + settings: Option[ClientConnectionSettings], + materializerSettings: MaterializerSettings) extends SetupOutgoingChannel object Connect { - def apply(host: String, port: Int = 80, sslEncryption: Boolean = false, localAddress: Option[InetSocketAddress] = None, - options: immutable.Traversable[Inet.SocketOption] = Nil, settings: Option[ClientConnectionSettings] = None): Connect = - apply(new InetSocketAddress(host, port), sslEncryption, localAddress, options, settings) + def apply(host: String, port: Int = 80, + localAddress: Option[InetSocketAddress] = None, + options: immutable.Traversable[Inet.SocketOption] = Nil, + settings: Option[ClientConnectionSettings] = None, + materializerSettings: MaterializerSettings = MaterializerSettings()): Connect = + apply(new InetSocketAddress(host, port), localAddress, options, settings, materializerSettings) } - case class HostConnectorSetup(host: String, port: Int = 80, - sslEncryption: Boolean = false, - options: immutable.Traversable[Inet.SocketOption] = Nil, - settings: Option[HostConnectorSettings] = None, - connectionType: ClientConnectionType = ClientConnectionType.AutoProxied, - defaultHeaders: immutable.Seq[HttpHeader] = Nil) extends OutgoingHttpChannelSetup { - private[http] def normalized(implicit refFactory: ActorRefFactory) = - if (settings.isDefined) this - else copy(settings = Some(HostConnectorSettings(actorSystem))) - } - object HostConnectorSetup { - def apply(host: String, port: Int, sslEncryption: Boolean)(implicit refFactory: ActorRefFactory): HostConnectorSetup = - apply(host, port, sslEncryption).normalized - } + // PREVIEW OF COMING API HERE: + // + // case class SetupHostConnector(host: String, port: Int = 80, + // options: immutable.Traversable[Inet.SocketOption] = Nil, + // settings: Option[HostConnectorSettings] = None, + // connectionType: ClientConnectionType = ClientConnectionType.AutoProxied, + // defaultHeaders: immutable.Seq[HttpHeader] = Nil) extends SetupOutgoingChannel { + // private[http] def normalized(implicit refFactory: ActorRefFactory) = + // if (settings.isDefined) this + // else copy(settings = Some(HostConnectorSettings(actorSystem))) + // } + // object SetupHostConnector { + // def apply(host: String, port: Int, sslEncryption: Boolean)(implicit refFactory: ActorRefFactory): SetupHostConnector = + // apply(host, port, sslEncryption).normalized + // } + // sealed trait ClientConnectionType + // object ClientConnectionType { + // object Direct extends ClientConnectionType + // object AutoProxied extends ClientConnectionType + // final case class Proxied(proxyHost: String, proxyPort: Int) extends ClientConnectionType + // } + // + // case object SetupRequestChannel extends SetupOutgoingChannel - case object HttpRequestChannelSetup extends OutgoingHttpChannelSetup - - final case class OpenOutgoingHttpChannel(channelSetup: OutgoingHttpChannelSetup) + sealed trait OutgoingChannel { + def processor[T]: HttpClientProcessor[T] + } /** - * Command triggering the shutdown of the respective HTTP channel. - * - * If sent to - * - client-side connection actors: triggers the closing of the connection - * - host-connector actors: triggers the closing of all connections and the shutdown of the host-connector - * - the `HttpManager` actor: triggers the closing of all outgoing and incoming connections, the shutdown of all - * host-connectors and the unbinding of all servers + * An `OutgoingHttpChannel` with a single outgoing HTTP connection as the underlying transport. */ - type CloseCommand = Tcp.CloseCommand - val Close = Tcp.Close - val ConfirmedClose = Tcp.ConfirmedClose - val Abort = Tcp.Abort - - sealed trait ClientConnectionType - object ClientConnectionType { - object Direct extends ClientConnectionType - object AutoProxied extends ClientConnectionType - final case class Proxied(proxyHost: String, proxyPort: Int) extends ClientConnectionType + final case class OutgoingConnection(remoteAddress: InetSocketAddress, + localAddress: InetSocketAddress, + untypedProcessor: HttpClientProcessor[Any]) extends OutgoingChannel { + def processor[T] = untypedProcessor.asInstanceOf[HttpClientProcessor[T]] } + // PREVIEW OF COMING API HERE: + // + // /** + // * An `OutgoingHttpChannel` with a connection pool to a specific host/port as the underlying transport. + // */ + // final case class HostChannel(host: String, port: Int, + // untypedProcessor: HttpClientProcessor[Any]) extends OutgoingChannel { + // def processor[T] = untypedProcessor.asInstanceOf[HttpClientProcessor[T]] + // } + // + // /** + // * A general `OutgoingHttpChannel` with connection pools to all possible host/port combinations + // * as the underlying transport. + // */ + // final case class RequestChannel(untypedProcessor: HttpClientProcessor[Any]) extends OutgoingChannel { + // def processor[T] = untypedProcessor.asInstanceOf[HttpClientProcessor[T]] + // } + final case class Bind(endpoint: InetSocketAddress, backlog: Int, options: immutable.Traversable[Inet.SocketOption], @@ -92,38 +109,6 @@ object Http extends ExtensionKey[HttpExt] { apply(new InetSocketAddress(interface, port), backlog, options, serverSettings, materializerSettings) } - case class Unbind(timeout: Duration) - object Unbind extends Unbind(Duration.Zero) - - type ConnectionClosed = Tcp.ConnectionClosed - val Closed = Tcp.Closed - val Aborted = Tcp.Aborted - val ConfirmedClosed = Tcp.ConfirmedClosed - val PeerClosed = Tcp.PeerClosed - type ErrorClosed = Tcp.ErrorClosed; val ErrorClosed = Tcp.ErrorClosed - - /** - * Response to an `OutgoingHttpChannelSetup` command (in the success case). - * The `transport` actor can be sent `HttpRequest` instances which will be responded - * to with the respective `HttpResponse` instance as soon as the start of which has - * been received. - * The sender of the `OutgoingHttpChannelInfo` response is always identical to the transport. - */ - sealed trait OutgoingHttpChannelInfo { - def transport: ActorRef - } - - final case class Connected(transport: ActorRef, - remoteAddress: InetSocketAddress, - localAddress: InetSocketAddress) extends OutgoingHttpChannelInfo - - final case class HostConnectorInfo(transport: ActorRef, - setup: HostConnectorSetup) extends OutgoingHttpChannelInfo - - final case class HttpRequestChannelInfo(transport: ActorRef) extends OutgoingHttpChannelInfo - - ///////////////////// server-side events //////////////////////// - final case class ServerBinding(localAddress: InetSocketAddress, connectionStream: Producer[IncomingConnection]) @@ -131,15 +116,11 @@ object Http extends ExtensionKey[HttpExt] { requestProducer: Producer[HttpRequest], responseConsumer: Consumer[HttpResponse]) - val Unbound = Tcp.Unbound - case object BindFailedException extends SingletonException - case object UnbindFailedException extends SingletonException - class ConnectionException(message: String) extends RuntimeException(message) - class ConnectionAttemptFailedException(val host: String, val port: Int) extends ConnectionException(s"Connection attempt to $host:$port failed") + class ConnectionAttemptFailedException(val endpoint: InetSocketAddress) extends ConnectionException(s"Connection attempt to $endpoint failed") class RequestTimeoutException(val request: HttpRequest, message: String) extends ConnectionException(message) } @@ -148,10 +129,6 @@ class HttpExt(system: ExtendedActorSystem) extends akka.io.IO.Extension { val Settings = new Settings(system.settings.config getConfig "akka.http") class Settings private[HttpExt] (config: Config) { val ManagerDispatcher = config getString "manager-dispatcher" - val SettingsGroupDispatcher = config getString "settings-group-dispatcher" - val HostConnectorDispatcher = config getString "host-connector-dispatcher" - val ListenerDispatcher = config getString "listener-dispatcher" - val ConnectionDispatcher = config getString "connection-dispatcher" } val manager = system.actorOf(props = HttpManager.props(Settings), name = "IO-HTTP") diff --git a/akka-http-core/src/main/scala/akka/http/HttpManager.scala b/akka-http-core/src/main/scala/akka/http/HttpManager.scala index cf66bd07c7..6784a9171f 100644 --- a/akka-http-core/src/main/scala/akka/http/HttpManager.scala +++ b/akka-http-core/src/main/scala/akka/http/HttpManager.scala @@ -4,13 +4,17 @@ package akka.http -import scala.util.control.NonFatal -import scala.collection.immutable -import akka.io.Inet -import akka.http.model.{ HttpHeader, Uri, HttpRequest } -import akka.http.server.HttpListener +import scala.util.{ Failure, Success } +import scala.concurrent.duration._ +import akka.io.IO +import akka.util.Timeout +import akka.stream.io.StreamTcp +import akka.stream.FlowMaterializer import akka.http.client._ import akka.actor._ +import akka.http.server.{ HttpServerPipeline, ServerSettings } +import akka.pattern.ask +import akka.stream.scaladsl.Flow /** * INTERNAL API @@ -18,232 +22,62 @@ import akka.actor._ * The gateway actor into the low-level HTTP layer. */ private[http] class HttpManager(httpSettings: HttpExt#Settings) extends Actor with ActorLogging { - import HttpManager._ - import httpSettings._ + import context.dispatcher - // counters for naming the various sub-actors we create - private[this] val listenerCounter = Iterator from 0 - private[this] val groupCounter = Iterator from 0 - private[this] val hostConnectorCounter = Iterator from 0 - private[this] val proxyConnectorCounter = Iterator from 0 + private[this] var clientPipelines = Map.empty[ClientConnectionSettings, HttpClientPipeline] - // our child actors - private[this] var settingsGroups = Map.empty[ClientConnectionSettings, ActorRef] - private[this] var connectors = Map.empty[Http.HostConnectorSetup, ActorRef] - private[this] var listeners = Seq.empty[ActorRef] - - /////////////////// INITIAL / RUNNING STATE ////////////////////// - - def receive = withTerminationManagement { - case request: HttpRequest ⇒ - try { - val uri = request.effectiveUri(securedConnection = false) - val connector = connectorForUri(uri) - // we never render absolute URIs, also drop any potentially existing fragment - connector.forward(request.copy(uri = uri.toRelative.withoutFragment)) - } catch { - case NonFatal(e) ⇒ - log.error("Illegal request: {}", e.getMessage) - sender() ! Status.Failure(e) - } - - // 3xx Redirect, sent up from one of our HttpHostConnector children for us to - // forward to the respective HttpHostConnector for the redirection target - case ctx @ HttpHostConnector.RequestContext(req, _, _, commander) ⇒ - val connector = connectorForUri(req.uri) - // we never render absolute URIs, also drop any potentially existing fragment - val newReq = req.copy(uri = req.uri.toRelative.withoutFragment) - connector.tell(ctx.copy(request = newReq), commander) - - case connect: Http.Connect ⇒ - settingsGroupFor(ClientConnectionSettings(connect.settings)).forward(connect) - - case setup: Http.HostConnectorSetup ⇒ - val connector = connectorFor(setup) - sender().tell(Http.HostConnectorInfo(connector, setup), connector) - - // we support sending an HttpRequest instance together with a corresponding HostConnectorSetup - // in one step (rather than sending the setup first and having to wait for the response) - case (request: HttpRequest, setup: Http.HostConnectorSetup) ⇒ - connectorFor(setup).forward(request) - - case Http.HttpRequestChannelSetup ⇒ sender() ! Http.HttpRequestChannelInfo - - case bind: Http.Bind ⇒ + def receive = { + case connect @ Http.Connect(remoteAddress, localAddress, options, settings, materializerSettings) ⇒ + log.debug("Attempting connection to {}", remoteAddress) val commander = sender() - listeners :+= context.watch { - context.actorOf( - props = HttpListener.props(commander, bind, httpSettings), - name = "listener-" + listenerCounter.next()) + val effectiveSettings = ClientConnectionSettings(settings) + val tcpConnect = StreamTcp.Connect(materializerSettings, remoteAddress, localAddress, options, + effectiveSettings.connectingTimeout, effectiveSettings.idleTimeout) + val askTimeout = Timeout(effectiveSettings.connectingTimeout + 5.seconds) // FIXME: how can we improve this? + val tcpConnectionFuture = IO(StreamTcp)(context.system).ask(tcpConnect)(askTimeout) + tcpConnectionFuture onComplete { + case Success(tcpConn: StreamTcp.OutgoingTcpConnection) ⇒ + val pipeline = clientPipelines.getOrElse(effectiveSettings, { + val pl = new HttpClientPipeline(effectiveSettings, FlowMaterializer(materializerSettings), log) + clientPipelines = clientPipelines.updated(effectiveSettings, pl) + pl + }) + commander ! pipeline(tcpConn) + + case Failure(error) ⇒ + log.debug("Could not connect to {} due to {}", remoteAddress, error) + commander ! Status.Failure(new Http.ConnectionAttemptFailedException(remoteAddress)) + + case x ⇒ throw new IllegalStateException("Unexpected response to `Connect` from StreamTcp: " + x) } - case cmd: Http.CloseCommand ⇒ - // start triggering an orderly complete shutdown by first closing all outgoing connections - shutdownSettingsGroups(cmd, Set(sender())) - } + case Http.Bind(endpoint, backlog, options, settings, materializerSettings) ⇒ + log.debug("Binding to {}", endpoint) + val commander = sender() + val effectiveSettings = ServerSettings(settings) + val tcpBind = StreamTcp.Bind(materializerSettings, endpoint, backlog, options) + val askTimeout = Timeout(effectiveSettings.bindTimeout + 5.seconds) // FIXME: how can we improve this? + val tcpServerBindingFuture = IO(StreamTcp)(context.system).ask(tcpBind)(askTimeout) + tcpServerBindingFuture onComplete { + case Success(StreamTcp.TcpServerBinding(localAddress, connectionStream)) ⇒ + log.info("Bound to {}", endpoint) + val materializer = FlowMaterializer(materializerSettings) + val httpServerPipeline = new HttpServerPipeline(effectiveSettings, materializer, log) + val httpConnectionStream = Flow(connectionStream) + .map(httpServerPipeline) + .toProducer(materializer) + commander ! Http.ServerBinding(localAddress, httpConnectionStream) - def withTerminationManagement(behavior: Receive): Receive = ({ - case ev @ Terminated(child) ⇒ - if (listeners contains child) - listeners = listeners filter (_ != child) - else if (connectors exists (_._2 == child)) - connectors = connectors filter { _._2 != child } - else - settingsGroups = settingsGroups filter { _._2 != child } - behavior.applyOrElse(ev, (_: Terminated) ⇒ ()) + case Failure(error) ⇒ + log.warning("Bind to {} failed due to ", endpoint, error) + commander ! Status.Failure(Http.BindFailedException) - case HttpHostConnector.DemandIdleShutdown ⇒ - val hostConnector = sender() - var sendPoisonPill = true - connectors = connectors filter { - case (x: ProxyConnectorSetup, proxiedConnector) if x.proxyConnector == hostConnector ⇒ - proxiedConnector ! HttpHostConnector.DemandIdleShutdown - sendPoisonPill = false // the PoisonPill will be sent by the proxiedConnector - false - case (_, `hostConnector`) ⇒ false - case _ ⇒ true + case x ⇒ throw new IllegalStateException("Unexpected response to `Bind` from StreamTcp: " + x) } - if (sendPoisonPill) hostConnector ! PoisonPill - }: Receive) orElse behavior - - def connectorForUri(uri: Uri) = { - val host = uri.authority.host - connectorFor(Http.HostConnectorSetup(host.toString(), uri.effectivePort, sslEncryption = uri.scheme == "https")) } - - def connectorFor(setup: Http.HostConnectorSetup) = { - val normalizedSetup = resolveAutoProxied(setup) - import Http.ClientConnectionType._ - normalizedSetup.connectionType match { - case _: Proxied ⇒ proxiedConnectorFor(normalizedSetup) - case Direct ⇒ hostConnectorFor(normalizedSetup) - case AutoProxied ⇒ throw new IllegalStateException("Unexpected unresolved connectionType `AutoProxied`") - } - } - - def proxiedConnectorFor(normalizedSetup: Http.HostConnectorSetup): ActorRef = { - val Http.ClientConnectionType.Proxied(proxyHost, proxyPort) = normalizedSetup.connectionType - val proxyConnector = hostConnectorFor(normalizedSetup.copy(host = proxyHost, port = proxyPort)) - val proxySetup = proxyConnectorSetup(normalizedSetup, proxyConnector) - def createAndRegisterProxiedConnector = { - val proxiedConnector = context.actorOf( - props = ProxiedHostConnector.props(normalizedSetup.host, normalizedSetup.port, proxyConnector), - name = "proxy-connector-" + proxyConnectorCounter.next()) - connectors = connectors.updated(proxySetup, proxiedConnector) - context.watch(proxiedConnector) - } - connectors.getOrElse(proxySetup, createAndRegisterProxiedConnector) - } - - def hostConnectorFor(normalizedSetup: Http.HostConnectorSetup): ActorRef = { - def createAndRegisterHostConnector = { - val settingsGroup = settingsGroupFor(normalizedSetup.settings.get.connectionSettings) - val hostConnector = context.actorOf( - props = HttpHostConnector.props(normalizedSetup, settingsGroup, HostConnectorDispatcher), - name = "host-connector-" + hostConnectorCounter.next()) - connectors = connectors.updated(normalizedSetup, hostConnector) - context.watch(hostConnector) - } - connectors.getOrElse(normalizedSetup, createAndRegisterHostConnector) - } - - def settingsGroupFor(settings: ClientConnectionSettings): ActorRef = { - def createAndRegisterSettingsGroup = { - val group = context.actorOf( - props = HttpClientSettingsGroup.props(settings, httpSettings), - name = "group-" + groupCounter.next()) - settingsGroups = settingsGroups.updated(settings, group) - context.watch(group) - } - settingsGroups.getOrElse(settings, createAndRegisterSettingsGroup) - } - - /////////////////// ORDERLY SHUTDOWN PROCESS ////////////////////// - - // TODO: add configurable timeouts for these shutdown steps - - def shutdownSettingsGroups(cmd: Http.CloseCommand, commanders: Set[ActorRef]): Unit = - if (!settingsGroups.isEmpty) { - settingsGroups.values.foreach(_ ! cmd) - context.become(closingSettingsGroups(cmd, commanders)) - } else shutdownHostConnectors(cmd, commanders) // if we are done with the outgoing connections, close all host connectors - - def closingSettingsGroups(cmd: Http.CloseCommand, commanders: Set[ActorRef]): Receive = - withTerminationManagement { - case _: Http.CloseCommand ⇒ // the first CloseCommand we received has precedence over ones potentially sent later - context.become(closingSettingsGroups(cmd, commanders + sender())) - - case Terminated(_) ⇒ - if (settingsGroups.isEmpty) // if we are done with the outgoing connections, close all host connectors - shutdownHostConnectors(cmd, commanders) - } - - def shutdownHostConnectors(cmd: Http.CloseCommand, commanders: Set[ActorRef]): Unit = - if (!connectors.isEmpty) { - connectors.values.foreach(_ ! cmd) - context.become(closingConnectors(cmd, commanders)) - } else shutdownListeners(cmd, commanders) // if we are done with the host connectors, close all listeners - - def closingConnectors(cmd: Http.CloseCommand, commanders: Set[ActorRef]): Receive = - withTerminationManagement { - case _: Http.CloseCommand ⇒ // the first CloseCommand we received has precedence over ones potentially sent later - context.become(closingConnectors(cmd, commanders + sender())) - - case Terminated(_) ⇒ - if (connectors.isEmpty) // if we are done with the host connectors, close all listeners - shutdownListeners(cmd, commanders) - } - - def shutdownListeners(cmd: Http.CloseCommand, commanders: Set[ActorRef]): Unit = { - listeners foreach { x ⇒ x ! cmd } - context.become(unbinding(cmd, commanders)) - if (listeners.isEmpty) self ! Http.Unbound - } - - def unbinding(cmd: Http.CloseCommand, commanders: Set[ActorRef]): Receive = - withTerminationManagement { - case _: Http.CloseCommand ⇒ // the first CloseCommand we received has precedence over ones potentially sent later - context.become(unbinding(cmd, commanders + sender())) - - case Terminated(_) ⇒ - if (connectors.isEmpty) { - // if we are done with the listeners we have completed the full orderly shutdown - commanders.foreach(_ ! cmd.event) - context.become(receive) - } - } } private[http] object HttpManager { def props(httpSettings: HttpExt#Settings) = Props(classOf[HttpManager], httpSettings) withDispatcher httpSettings.ManagerDispatcher - - private class ProxyConnectorSetup(host: String, port: Int, sslEncryption: Boolean, - options: immutable.Traversable[Inet.SocketOption], - settings: Option[HostConnectorSettings], connectionType: Http.ClientConnectionType, - defaultHeaders: immutable.Seq[HttpHeader], val proxyConnector: ActorRef) - extends Http.HostConnectorSetup(host, port, sslEncryption, options, settings, connectionType, defaultHeaders) - - private def proxyConnectorSetup(normalizedSetup: Http.HostConnectorSetup, proxyConnector: ActorRef) = { - import normalizedSetup._ - new ProxyConnectorSetup(host, port, sslEncryption, options, settings, connectionType, defaultHeaders, proxyConnector) - } - - def resolveAutoProxied(setup: Http.HostConnectorSetup)(implicit refFactory: ActorRefFactory) = { - val normalizedSetup = setup.normalized - import normalizedSetup._ - val resolved = - if (sslEncryption) Http.ClientConnectionType.Direct // TODO - else connectionType match { - case Http.ClientConnectionType.AutoProxied ⇒ - val scheme = Uri.httpScheme(sslEncryption) - val proxySettings = settings.get.connectionSettings.proxySettings.get(scheme) - proxySettings.filter(_.matchesHost(host)) match { - case Some(ProxySettings(proxyHost, proxyPort, _)) ⇒ Http.ClientConnectionType.Proxied(proxyHost, proxyPort) - case None ⇒ Http.ClientConnectionType.Direct - } - case x ⇒ x - } - normalizedSetup.copy(connectionType = resolved) - } } \ No newline at end of file diff --git a/akka-http-core/src/main/scala/akka/http/client/ClientConnectionSettings.scala b/akka-http-core/src/main/scala/akka/http/client/ClientConnectionSettings.scala index a70778118c..f413e3c14c 100644 --- a/akka-http-core/src/main/scala/akka/http/client/ClientConnectionSettings.scala +++ b/akka-http-core/src/main/scala/akka/http/client/ClientConnectionSettings.scala @@ -5,7 +5,7 @@ package akka.http.client import com.typesafe.config.Config -import scala.concurrent.duration.Duration +import scala.concurrent.duration.{ FiniteDuration, Duration } import akka.actor.ActorRefFactory import akka.http.model.headers.`User-Agent` import akka.http.parsing.ParserSettings @@ -13,38 +13,25 @@ import akka.http.util._ final case class ClientConnectionSettings( userAgentHeader: Option[`User-Agent`], - connectingTimeout: Duration, + connectingTimeout: FiniteDuration, idleTimeout: Duration, - requestTimeout: Duration, - reapingCycle: Duration, requestHeaderSizeHint: Int, - maxEncryptionChunkSize: Int, - proxySettings: Map[String, ProxySettings], parserSettings: ParserSettings) { - require(connectingTimeout >= Duration.Zero, "connectingTimeout must be > 0 or 'infinite'") - require(idleTimeout >= Duration.Zero, "idleTimeout must be > 0 or 'infinite'") - require(requestTimeout >= Duration.Zero, "requestTimeout must be > 0 or 'infinite'") - require(reapingCycle >= Duration.Zero, "reapingCycle must be > 0 or 'infinite'") + require(connectingTimeout >= Duration.Zero, "connectingTimeout must be > 0") require(requestHeaderSizeHint > 0, "request-size-hint must be > 0") - require(maxEncryptionChunkSize > 0, "max-encryption-chunk-size must be > 0") } object ClientConnectionSettings extends SettingsCompanion[ClientConnectionSettings]("akka.http.client") { def fromSubConfig(c: Config) = { apply( c.getString("user-agent-header").toOption.map(`User-Agent`(_)), - c getPotentiallyInfiniteDuration "connecting-timeout", + c getFiniteDuration "connecting-timeout", c getPotentiallyInfiniteDuration "idle-timeout", - c getPotentiallyInfiniteDuration "request-timeout", - c getPotentiallyInfiniteDuration "reaping-cycle", c getIntBytes "request-header-size-hint", - c getIntBytes "max-encryption-chunk-size", - ProxySettings fromSubConfig c.getConfig("proxy"), ParserSettings fromSubConfig c.getConfig("parsing")) } def apply(optionalSettings: Option[ClientConnectionSettings])(implicit actorRefFactory: ActorRefFactory): ClientConnectionSettings = optionalSettings getOrElse apply(actorSystem) -} - +} \ No newline at end of file diff --git a/akka-http-core/src/main/scala/akka/http/client/HostConnectorSettings.scala b/akka-http-core/src/main/scala/akka/http/client/HostConnectorSettings.scala deleted file mode 100644 index 7210633323..0000000000 --- a/akka-http-core/src/main/scala/akka/http/client/HostConnectorSettings.scala +++ /dev/null @@ -1,33 +0,0 @@ -/** - * Copyright (C) 2009-2014 Typesafe Inc. - */ - -package akka.http.client - -import com.typesafe.config.Config -import scala.concurrent.duration.Duration -import akka.http.util._ - -final case class HostConnectorSettings( - maxConnections: Int, - maxRetries: Int, - maxRedirects: Int, - pipelining: Boolean, - idleTimeout: Duration, - connectionSettings: ClientConnectionSettings) { - - require(maxConnections > 0, "max-connections must be > 0") - require(maxRetries >= 0, "max-retries must be >= 0") - require(maxRedirects >= 0, "max-redirects must be >= 0") - require(idleTimeout >= Duration.Zero, "idleTimeout must be > 0 or 'infinite'") -} - -object HostConnectorSettings extends SettingsCompanion[HostConnectorSettings]("akka.http") { - def fromSubConfig(c: Config) = apply( - c getInt "host-connector.max-connections", - c getInt "host-connector.max-retries", - c getInt "host-connector.max-redirects", - c getBoolean "host-connector.pipelining", - c getPotentiallyInfiniteDuration "host-connector.idle-timeout", - ClientConnectionSettings fromSubConfig c.getConfig("client")) -} diff --git a/akka-http-core/src/main/scala/akka/http/client/HttpClientPipeline.scala b/akka-http-core/src/main/scala/akka/http/client/HttpClientPipeline.scala new file mode 100644 index 0000000000..070f19b55a --- /dev/null +++ b/akka-http-core/src/main/scala/akka/http/client/HttpClientPipeline.scala @@ -0,0 +1,83 @@ +/** + * Copyright (C) 2009-2014 Typesafe Inc. + */ + +package akka.http.client + +import java.net.InetSocketAddress +import scala.collection.immutable.Queue +import akka.stream.{ FlattenStrategy, FlowMaterializer } +import akka.event.LoggingAdapter +import akka.stream.io.StreamTcp +import akka.stream.scaladsl.{ Flow, Duct } +import akka.http.Http +import akka.http.model.{ HttpMethod, HttpRequest, ErrorInfo, HttpResponse } +import akka.http.rendering.{ RequestRenderingContext, HttpRequestRendererFactory } +import akka.http.parsing.HttpResponseParser +import akka.http.parsing.ParserOutput._ +import akka.http.util._ + +/** + * INTERNAL API + */ +private[http] class HttpClientPipeline(effectiveSettings: ClientConnectionSettings, + materializer: FlowMaterializer, + log: LoggingAdapter) + extends (StreamTcp.OutgoingTcpConnection ⇒ Http.OutgoingConnection) { + + import effectiveSettings._ + + val rootParser = new HttpResponseParser(parserSettings, materializer)() + val warnOnIllegalHeader: ErrorInfo ⇒ Unit = errorInfo ⇒ + if (parserSettings.illegalHeaderWarnings) + log.warning(errorInfo.withSummaryPrepended("Illegal response header").formatPretty) + + val responseRendererFactory = new HttpRequestRendererFactory(userAgentHeader, requestHeaderSizeHint, materializer, log) + + def apply(tcpConn: StreamTcp.OutgoingTcpConnection): Http.OutgoingConnection = { + val requestMethodByPass = new RequestMethodByPass(tcpConn.remoteAddress) + + val (contextBypassConsumer, contextBypassProducer) = + Duct[(HttpRequest, Any)].map(_._2).build(materializer) + + val requestConsumer = + Duct[(HttpRequest, Any)] + .tee(contextBypassConsumer) + .map(requestMethodByPass) + .transform(responseRendererFactory.newRenderer) + .flatten(FlattenStrategy.concat) + .transform(errorLogger(log, "Outgoing request stream error")) + .produceTo(materializer, tcpConn.outputStream) + + val responseProducer = + Flow(tcpConn.inputStream) + .transform(rootParser.copyWith(warnOnIllegalHeader, requestMethodByPass)) + .splitWhen(_.isInstanceOf[MessageStart]) + .headAndTail(materializer) + .collect { + case (ResponseStart(statusCode, protocol, headers, createEntity, _), entityParts) ⇒ + HttpResponse(statusCode, headers, createEntity(entityParts), protocol) + } + .zip(contextBypassProducer) + .toProducer(materializer) + + val processor = HttpClientProcessor(requestConsumer.getSubscriber, responseProducer.getPublisher) + Http.OutgoingConnection(tcpConn.remoteAddress, tcpConn.localAddress, processor) + } + + class RequestMethodByPass(serverAddress: InetSocketAddress) + extends (((HttpRequest, Any)) ⇒ RequestRenderingContext) with (() ⇒ HttpMethod) { + private[this] var requestMethods = Queue.empty[HttpMethod] + def apply(tuple: (HttpRequest, Any)) = { + val request = tuple._1 + requestMethods = requestMethods.enqueue(request.method) + RequestRenderingContext(request, serverAddress) + } + def apply(): HttpMethod = + if (requestMethods.nonEmpty) { + val method = requestMethods.head + requestMethods = requestMethods.tail + method + } else HttpResponseParser.NoMethod + } +} \ No newline at end of file diff --git a/akka-http-core/src/main/scala/akka/http/client/HttpClientProcessor.scala b/akka-http-core/src/main/scala/akka/http/client/HttpClientProcessor.scala new file mode 100644 index 0000000000..055821dbe2 --- /dev/null +++ b/akka-http-core/src/main/scala/akka/http/client/HttpClientProcessor.scala @@ -0,0 +1,26 @@ +/** + * Copyright (C) 2009-2014 Typesafe Inc. + */ + +package akka.http.client + +import akka.http.model.{ HttpResponse, HttpRequest } +import org.reactivestreams.spi.{ Publisher, Subscriber } +import org.reactivestreams.api.{ Consumer, Processor } + +/** + * A `HttpClientProcessor` models an HTTP client as a stream processor that provides + * responses for requests with an attached context object of a custom type, + * which is funneled through and completely transparent to the processor itself. + */ +trait HttpClientProcessor[T] extends Processor[(HttpRequest, T), (HttpResponse, T)] + +object HttpClientProcessor { + def apply[T](requestSubscriber: Subscriber[(HttpRequest, T)], + responsePublisher: Publisher[(HttpResponse, T)]): HttpClientProcessor[T] = + new HttpClientProcessor[T] { + def getSubscriber = requestSubscriber + def getPublisher = responsePublisher + def produceTo(consumer: Consumer[(HttpResponse, T)]): Unit = responsePublisher.subscribe(consumer.getSubscriber) + } +} \ No newline at end of file diff --git a/akka-http-core/src/main/scala/akka/http/client/HttpClientSettingsGroup.scala b/akka-http-core/src/main/scala/akka/http/client/HttpClientSettingsGroup.scala deleted file mode 100644 index c1d321763c..0000000000 --- a/akka-http-core/src/main/scala/akka/http/client/HttpClientSettingsGroup.scala +++ /dev/null @@ -1,24 +0,0 @@ -/** - * Copyright (C) 2009-2014 Typesafe Inc. - */ - -package akka.http.client - -import akka.http.HttpExt -import akka.actor.{ Props, Actor } - -/** - * INTERNAL API - */ -private[http] class HttpClientSettingsGroup(settings: ClientConnectionSettings, - httpSettings: HttpExt#Settings) extends Actor { - def receive: Receive = ??? // TODO -} - -/** - * INTERNAL API - */ -private[http] object HttpClientSettingsGroup { - def props(settings: ClientConnectionSettings, httpSettings: HttpExt#Settings) = - Props(classOf[HttpClientSettingsGroup], httpSettings) withDispatcher httpSettings.SettingsGroupDispatcher -} \ No newline at end of file diff --git a/akka-http-core/src/main/scala/akka/http/client/HttpHostConnector.scala b/akka-http-core/src/main/scala/akka/http/client/HttpHostConnector.scala deleted file mode 100644 index 874675a9f2..0000000000 --- a/akka-http-core/src/main/scala/akka/http/client/HttpHostConnector.scala +++ /dev/null @@ -1,58 +0,0 @@ -/** - * Copyright (C) 2009-2014 Typesafe Inc. - */ - -package akka.http.client - -import scala.collection.immutable -import akka.actor.{ Props, ActorRef, Actor, ActorLogging } -import akka.http.model.HttpRequest -import akka.http.Http - -/** - * INTERNAL API - */ -private[http] class HttpHostConnector(normalizedSetup: Http.HostConnectorSetup, clientConnectionSettingsGroup: ActorRef) - extends Actor with ActorLogging { - - def receive: Receive = ??? // TODO - -} - -/** - * INTERNAL API - */ -private[http] object HttpHostConnector { - def props(normalizedSetup: Http.HostConnectorSetup, clientConnectionSettingsGroup: ActorRef, dispatcher: String) = - Props(classOf[HttpHostConnector], normalizedSetup, clientConnectionSettingsGroup) withDispatcher dispatcher - - final case class RequestContext(request: HttpRequest, retriesLeft: Int, redirectsLeft: Int, commander: ActorRef) - final case class Disconnected(rescheduledRequestCount: Int) - case object RequestCompleted - case object DemandIdleShutdown - - sealed trait SlotState { - def enqueue(request: HttpRequest): SlotState - def dequeueOne: SlotState - def openRequestCount: Int - } - object SlotState { - sealed abstract class WithoutRequests extends SlotState { - def enqueue(request: HttpRequest) = Connected(immutable.Queue(request)) - def dequeueOne = throw new IllegalStateException - def openRequestCount = 0 - } - case object Unconnected extends WithoutRequests - final case class Connected(openRequests: immutable.Queue[HttpRequest]) extends SlotState { - require(openRequests.nonEmpty) - def enqueue(request: HttpRequest) = Connected(openRequests.enqueue(request)) - def dequeueOne = { - val reqs = openRequests.tail - if (reqs.isEmpty) Idle - else Connected(reqs) - } - def openRequestCount = openRequests.size - } - case object Idle extends WithoutRequests - } -} diff --git a/akka-http-core/src/main/scala/akka/http/client/OutgoingHttpChannel.scala b/akka-http-core/src/main/scala/akka/http/client/OutgoingHttpChannel.scala deleted file mode 100644 index 75dd159562..0000000000 --- a/akka-http-core/src/main/scala/akka/http/client/OutgoingHttpChannel.scala +++ /dev/null @@ -1,49 +0,0 @@ -/** - * Copyright (C) 2009-2014 Typesafe Inc. - */ - -package akka.http.client - -import akka.http.model.{ HttpResponse, HttpRequest } -import org.reactivestreams.api.Processor -import java.net.InetSocketAddress - -/** - * A `HttpClientProcessor` models an HTTP client as a stream processor that provides - * responses for requests with an attached context object of a custom type, - * which is funneled through and completely transparent to the processor itself. - */ -trait HttpClientProcessor[T] extends Processor[(HttpRequest, T), (HttpResponse, T)] - -/** - * An `OutgoingHttpChannel` is a provision of an `HttpClientProcessor` with potentially - * available additional information about the client or server. - */ -sealed trait OutgoingHttpChannel { - def processor[T]: HttpClientProcessor[T] -} - -/** - * An `OutgoingHttpChannel` with a single outgoing HTTP connection as the underlying transport. - */ -final case class OutgoingHttpConnection(remoteAddress: InetSocketAddress, - localAddress: InetSocketAddress, - untypedProcessor: HttpClientProcessor[Any]) extends OutgoingHttpChannel { - def processor[T] = untypedProcessor.asInstanceOf[HttpClientProcessor[T]] -} - -/** - * An `OutgoingHttpChannel` with a connection pool to a specific host/port as the underlying transport. - */ -final case class HttpHostChannel(host: String, port: Int, - untypedProcessor: HttpClientProcessor[Any]) extends OutgoingHttpChannel { - def processor[T] = untypedProcessor.asInstanceOf[HttpClientProcessor[T]] -} - -/** - * A general `OutgoingHttpChannel` with connection pools to all possible host/port combinations - * as the underlying transport. - */ -final case class HttpRequestChannel(untypedProcessor: HttpClientProcessor[Any]) extends OutgoingHttpChannel { - def processor[T] = untypedProcessor.asInstanceOf[HttpClientProcessor[T]] -} diff --git a/akka-http-core/src/main/scala/akka/http/client/ProxiedHostConnector.scala b/akka-http-core/src/main/scala/akka/http/client/ProxiedHostConnector.scala deleted file mode 100644 index b04f2fec95..0000000000 --- a/akka-http-core/src/main/scala/akka/http/client/ProxiedHostConnector.scala +++ /dev/null @@ -1,58 +0,0 @@ -/** - * Copyright (C) 2009-2014 Typesafe Inc. - */ - -package akka.http.client - -import akka.actor._ -import akka.http.model -import model.{ HttpRequest, Uri } - -/** - * INTERNAL API - * - * A wrapper around a [[HttpHostConnector]] that is connected to a proxy. Fixes missing Host headers and - * relative URIs or otherwise warns if these differ from the target host/port. - */ -private[http] class ProxiedHostConnector(host: String, port: Int, proxyConnector: ActorRef) extends Actor with ActorLogging { - - import Uri._ - val authority = Authority(Host(host), port).normalizedForHttp() - val hostHeader = model.headers.Host(host, authority.port) - - context.watch(proxyConnector) - - def receive: Receive = { - case request: HttpRequest ⇒ - val headers = request.header[model.headers.Host] match { - case Some(reqHostHeader) ⇒ - if (authority != Authority(reqHostHeader.host, reqHostHeader.port).normalizedForHttp()) - log.warning(s"sending request with header '$reqHostHeader' to a proxied connection to $authority") - request.headers - case None ⇒ request.headers :+ hostHeader - } - val effectiveUri = - if (request.uri.isRelative) - request.uri.toEffectiveHttpRequestUri(authority.host, port) - else { - if (authority != request.uri.authority.normalizedForHttp()) - log.warning(s"sending request with absolute URI '${request.uri}' to a proxied connection to $authority") - request.uri - } - proxyConnector.forward(request.copy(uri = effectiveUri).withHeaders(headers)) - - case HttpHostConnector.DemandIdleShutdown ⇒ - proxyConnector ! PoisonPill - context.stop(self) - - case Terminated(`proxyConnector`) ⇒ - context.stop(self) - - case x ⇒ proxyConnector.forward(x) - } -} - -private[http] object ProxiedHostConnector { - def props(host: String, port: Int, proxyConnector: ActorRef) = - Props(classOf[ProxiedHostConnector], host, port, proxyConnector) -} \ No newline at end of file diff --git a/akka-http-core/src/main/scala/akka/http/client/ProxySettings.scala b/akka-http-core/src/main/scala/akka/http/client/ProxySettings.scala deleted file mode 100644 index 9c7a792946..0000000000 --- a/akka-http-core/src/main/scala/akka/http/client/ProxySettings.scala +++ /dev/null @@ -1,74 +0,0 @@ -/** - * Copyright (C) 2009-2014 Typesafe Inc. - */ - -package akka.http.client - -import com.typesafe.config.{ ConfigValueType, Config } -import scala.annotation.tailrec -import scala.collection.JavaConverters._ -import akka.http.util._ - -final case class ProxySettings(host: String, port: Int, nonProxyHosts: List[String]) { - require(host.nonEmpty, "proxy host must be non-empty") - require(0 < port && port < 65536, "illegal proxy port") - require(nonProxyHosts forall validIgnore, "illegal nonProxyHosts") - - // see http://docs.oracle.com/javase/6/docs/technotes/guides/net/proxies.html - private def validIgnore(pattern: String) = pattern.exists(_ != '*') && !pattern.drop(1).dropRight(1).contains('*') - - val matchesHost: String ⇒ Boolean = { - @tailrec def rec(remainingNonProxyHosts: List[String], result: String ⇒ Boolean): String ⇒ Boolean = - remainingNonProxyHosts match { - case Nil ⇒ result - case pattern :: remaining ⇒ - val check: String ⇒ Boolean = - (pattern endsWith '*', pattern startsWith '*') match { - case (true, true) ⇒ - val p = pattern.drop(1).dropRight(1); _.contains(p) - case (true, false) ⇒ - val p = pattern.dropRight(1); _.startsWith(p) - case (false, true) ⇒ - val p = pattern.drop(1); _.endsWith(p) - case _ ⇒ _ == pattern - } - rec(remaining, host ⇒ !check(host) && result(host)) - } - rec(nonProxyHosts, result = _ ⇒ true) - } -} - -object ProxySettings extends SettingsCompanion[Map[String, ProxySettings]]("akka.http.client.proxy") { - // see http://docs.oracle.com/javase/6/docs/technotes/guides/net/proxies.html - def fromProperties(properties: Map[String, String], scheme: String): Option[ProxySettings] = { - val proxyHost = properties.get(s"$scheme.proxyHost") - val proxyPort = properties.get(s"$scheme.proxyPort") - val nonProxyHosts = properties.get(s"$scheme.nonProxyHosts") - proxyHost map (apply( - _, - proxyPort.getOrElse("80").toInt, - nonProxyHosts.map(_.fastSplit('|')).getOrElse(Nil).toList)) - } - - def fromSubConfig(c: Config) = apply(c, sys.props.toMap): Map[String, ProxySettings] - - def apply(c: Config, properties: Map[String, String]): Map[String, ProxySettings] = { - def proxySettings(scheme: String) = c.getValue(scheme).valueType() match { - case ConfigValueType.STRING ⇒ - c.getString(scheme) match { - case "default" ⇒ fromProperties(properties, scheme).map((scheme, _)) - case "none" ⇒ None - case unknown ⇒ throw new IllegalArgumentException(s"illegal value for proxy.$scheme: '$unknown'") - } - case _ ⇒ - val cfg = c getConfig scheme - Some(scheme -> apply( - cfg getString "host", - cfg getInt "port", - (cfg getStringList "non-proxy-hosts").asScala.toList)) - } - - val schemes = c.entrySet.asScala.groupBy(_.getKey.split("\\.")(0)).keySet - schemes.flatMap(proxySettings).toMap - } -} diff --git a/akka-http-core/src/main/scala/akka/http/model/HttpEntity.scala b/akka-http-core/src/main/scala/akka/http/model/HttpEntity.scala index 3f97fe120e..0d302e514b 100644 --- a/akka-http-core/src/main/scala/akka/http/model/HttpEntity.scala +++ b/akka-http-core/src/main/scala/akka/http/model/HttpEntity.scala @@ -158,6 +158,11 @@ object HttpEntity { def extension: String def isLastChunk: Boolean } + object ChunkStreamPart { + implicit def apply(string: String): ChunkStreamPart = Chunk(string) + implicit def apply(bytes: Array[Byte]): ChunkStreamPart = Chunk(bytes) + implicit def apply(bytes: ByteString): ChunkStreamPart = Chunk(bytes) + } /** * An intermediate entity chunk guaranteed to carry non-empty data. @@ -166,6 +171,10 @@ object HttpEntity { require(data.nonEmpty, "An HttpEntity.Chunk must have non-empty data") def isLastChunk = false } + object Chunk { + def apply(string: String): Chunk = apply(ByteString(string)) + def apply(bytes: Array[Byte]): Chunk = apply(ByteString(bytes)) + } /** * The final chunk of a chunk stream. diff --git a/akka-http-core/src/main/scala/akka/http/model/HttpMessage.scala b/akka-http-core/src/main/scala/akka/http/model/HttpMessage.scala index 338db0a538..166640f594 100644 --- a/akka-http-core/src/main/scala/akka/http/model/HttpMessage.scala +++ b/akka-http-core/src/main/scala/akka/http/model/HttpMessage.scala @@ -97,7 +97,9 @@ final case class HttpRequest(method: HttpMethod = HttpMethods.GET, entity: HttpEntity.Regular = HttpEntity.Empty, protocol: HttpProtocol = HttpProtocols.`HTTP/1.1`) extends HttpMessage { require(!uri.isEmpty, "An HttpRequest must not have an empty Uri") - require(entity.isKnownEmpty || method.isEntityAccepted) + require(entity.isKnownEmpty || method.isEntityAccepted, "Requests with this method must have an empty entity") + require(protocol == HttpProtocols.`HTTP/1.1` || !entity.isInstanceOf[HttpEntity.Chunked], + "HTTP/1.0 requests must not have a chunked entity") type Self = HttpRequest @@ -108,23 +110,8 @@ final case class HttpRequest(method: HttpMethod = HttpMethods.GET, * Returns a copy of this requests with the URI resolved according to the logic defined at * http://tools.ietf.org/html/rfc7230#section-5.5 */ - def effectiveUri(securedConnection: Boolean, defaultHostHeader: Host = Host.empty): Uri = { - val hostHeader = header[Host] - if (uri.isRelative) { - def fail(detail: String) = - throw new IllegalUriException(s"Cannot establish effective request URI of $this, request has a relative URI and $detail") - val Host(host, port) = hostHeader match { - case None ⇒ if (defaultHostHeader.isEmpty) fail("is missing a `Host` header") else defaultHostHeader - case Some(x) if x.isEmpty ⇒ if (defaultHostHeader.isEmpty) fail("an empty `Host` header") else defaultHostHeader - case Some(x) ⇒ x - } - uri.toEffectiveHttpRequestUri(host, port, securedConnection) - } else // http://tools.ietf.org/html/rfc7230#section-5.4 - if (hostHeader.isEmpty || uri.authority.isEmpty && hostHeader.get.isEmpty || - hostHeader.get.host.equalsIgnoreCase(uri.authority.host)) uri - else throw new IllegalUriException("'Host' header value doesn't match request target authority", - s"Host header: $hostHeader\nrequest target authority: ${uri.authority}") - } + def effectiveUri(securedConnection: Boolean, defaultHostHeader: Host = Host.empty): Uri = + HttpRequest.effectiveUri(uri, headers, securedConnection, defaultHostHeader) /** * The media-ranges accepted by the client according to the `Accept` request header. @@ -263,6 +250,30 @@ final case class HttpRequest(method: HttpMethod = HttpMethods.GET, else this } +object HttpRequest { + /** + * Determines the effective request URI according to the logic defined at + * http://tools.ietf.org/html/rfc7230#section-5.5 + */ + def effectiveUri(uri: Uri, headers: immutable.Seq[HttpHeader], securedConnection: Boolean, defaultHostHeader: Host): Uri = { + val hostHeader = headers.collectFirst { case x: Host ⇒ x } + if (uri.isRelative) { + def fail(detail: String) = + throw new IllegalUriException(s"Cannot establish effective URI of request to `$uri`, request has a relative URI and $detail") + val Host(host, port) = hostHeader match { + case None ⇒ if (defaultHostHeader.isEmpty) fail("is missing a `Host` header") else defaultHostHeader + case Some(x) if x.isEmpty ⇒ if (defaultHostHeader.isEmpty) fail("an empty `Host` header") else defaultHostHeader + case Some(x) ⇒ x + } + uri.toEffectiveHttpRequestUri(host, port, securedConnection) + } else // http://tools.ietf.org/html/rfc7230#section-5.4 + if (hostHeader.isEmpty || uri.authority.isEmpty && hostHeader.get.isEmpty || + hostHeader.get.host.equalsIgnoreCase(uri.authority.host)) uri + else throw new IllegalUriException("'Host' header value of request to `$uri` doesn't match request target authority", + s"Host header: $hostHeader\nrequest target authority: ${uri.authority}") + } +} + /** * The immutable HTTP response model. */ diff --git a/akka-http-core/src/main/scala/akka/http/model/headers/headers.scala b/akka-http-core/src/main/scala/akka/http/model/headers/headers.scala index 4bd761b251..443a20244f 100644 --- a/akka-http-core/src/main/scala/akka/http/model/headers/headers.scala +++ b/akka-http-core/src/main/scala/akka/http/model/headers/headers.scala @@ -301,7 +301,7 @@ sealed abstract case class Expect private () extends ModeledHeader { // http://tools.ietf.org/html/rfc7230#section-5.4 object Host extends ModeledCompanion { - def apply(address: InetSocketAddress): Host = apply(address.getHostName, address.getPort) + def apply(address: InetSocketAddress): Host = apply(address.getHostName, address.getPort) // TODO: upgrade to `getHostString` once we are on JDK7 def apply(host: String): Host = apply(host, 0) def apply(host: String, port: Int): Host = apply(Uri.Host(host), port) val empty = Host("") diff --git a/akka-http-core/src/main/scala/akka/http/parsing/HttpMessageParser.scala b/akka-http-core/src/main/scala/akka/http/parsing/HttpMessageParser.scala index a76f406126..7641ce9f3e 100644 --- a/akka-http-core/src/main/scala/akka/http/parsing/HttpMessageParser.scala +++ b/akka-http-core/src/main/scala/akka/http/parsing/HttpMessageParser.scala @@ -29,6 +29,8 @@ private[http] abstract class HttpMessageParser[Output >: ParserOutput.MessageOut private[this] val result = new ListBuffer[Output] // transformer op is currently optimized for LinearSeqs private[this] var state: ByteString ⇒ StateResult = startNewMessage(_, 0) private[this] var protocol: HttpProtocol = `HTTP/1.1` + private[this] var terminated = false + override def isComplete = terminated def onNext(input: ByteString): immutable.Seq[Output] = { result.clear() @@ -40,9 +42,13 @@ private[http] abstract class HttpMessageParser[Output >: ParserOutput.MessageOut result.toList } - def startNewMessage(input: ByteString, offset: Int): StateResult = - try parseMessage(input, offset) - catch { case NotEnoughDataException ⇒ continue(input, offset)(startNewMessage) } + def startNewMessage(input: ByteString, offset: Int): StateResult = { + def _startNewMessage(input: ByteString, offset: Int): StateResult = + try parseMessage(input, offset) + catch { case NotEnoughDataException ⇒ continue(input, offset)(_startNewMessage) } + + _startNewMessage(input, offset) + } def parseMessage(input: ByteString, offset: Int): StateResult @@ -83,16 +89,16 @@ private[http] abstract class HttpMessageParser[Output >: ParserOutput.MessageOut parseHeaderLines(input, lineEnd, h :: headers, headerCount + 1, Some(h), clh, cth, teh, hh) case h: `Content-Length` ⇒ - if (clh.isEmpty) parseHeaderLines(input, lineEnd, h :: headers, headerCount + 1, ch, Some(h), cth, teh, hh) + if (clh.isEmpty) parseHeaderLines(input, lineEnd, headers, headerCount + 1, ch, Some(h), cth, teh, hh) else fail("HTTP message must not contain more than one Content-Length header") case h: `Content-Type` ⇒ - if (cth.isEmpty) parseHeaderLines(input, lineEnd, h :: headers, headerCount + 1, ch, clh, Some(h), teh, hh) + if (cth.isEmpty) parseHeaderLines(input, lineEnd, headers, headerCount + 1, ch, clh, Some(h), teh, hh) else if (cth.get == h) parseHeaderLines(input, lineEnd, headers, headerCount, ch, clh, cth, teh, hh) else fail("HTTP message must not contain more than one Content-Type header") case h: `Transfer-Encoding` ⇒ - parseHeaderLines(input, lineEnd, h :: headers, headerCount + 1, ch, clh, cth, Some(h), hh) + parseHeaderLines(input, lineEnd, headers, headerCount + 1, ch, clh, cth, Some(h), hh) case h if headerCount < settings.maxHeaderCount ⇒ parseHeaderLines(input, lineEnd, h :: headers, headerCount + 1, ch, clh, cth, teh, hh || h.isInstanceOf[Host]) @@ -111,21 +117,23 @@ private[http] abstract class HttpMessageParser[Output >: ParserOutput.MessageOut clh: Option[`Content-Length`], cth: Option[`Content-Type`], teh: Option[`Transfer-Encoding`], hostHeaderPresent: Boolean, closeAfterResponseCompletion: Boolean): StateResult - def parseFixedLengthBody(remainingBodyBytes: Long)(input: ByteString, bodyStart: Int): StateResult = { + def parseFixedLengthBody(remainingBodyBytes: Long, + isLastMessage: Boolean)(input: ByteString, bodyStart: Int): StateResult = { val remainingInputBytes = input.length - bodyStart if (remainingInputBytes > 0) { if (remainingInputBytes < remainingBodyBytes) { emit(ParserOutput.EntityPart(input drop bodyStart)) - continue(parseFixedLengthBody(remainingBodyBytes - remainingInputBytes)) + continue(parseFixedLengthBody(remainingBodyBytes - remainingInputBytes, isLastMessage)) } else { val offset = bodyStart + remainingBodyBytes.toInt emit(ParserOutput.EntityPart(input.slice(bodyStart, offset))) - startNewMessage(input, offset) + if (isLastMessage) terminate() + else startNewMessage(input, offset) } - } else continue(input, bodyStart)(parseFixedLengthBody(remainingBodyBytes)) + } else continue(input, bodyStart)(parseFixedLengthBody(remainingBodyBytes, isLastMessage)) } - def parseChunk(input: ByteString, offset: Int): StateResult = { + def parseChunk(input: ByteString, offset: Int, isLastMessage: Boolean): StateResult = { @tailrec def parseTrailer(extension: String, lineStart: Int, headers: List[HttpHeader] = Nil, headerCount: Int = 0): StateResult = { val lineEnd = headerParser.parseHeaderLine(input, lineStart)() @@ -134,7 +142,8 @@ private[http] abstract class HttpMessageParser[Output >: ParserOutput.MessageOut val lastChunk = if (extension.isEmpty && headers.isEmpty) HttpEntity.LastChunk else HttpEntity.LastChunk(extension, headers) emit(ParserOutput.EntityChunk(lastChunk)) - startNewMessage(input, lineEnd) + if (isLastMessage) terminate() + else startNewMessage(input, lineEnd) case header if headerCount < settings.maxHeaderCount ⇒ parseTrailer(extension, lineEnd, header :: headers, headerCount + 1) case _ ⇒ fail(s"Chunk trailer contains more than the configured limit of ${settings.maxHeaderCount} headers") @@ -146,7 +155,7 @@ private[http] abstract class HttpMessageParser[Output >: ParserOutput.MessageOut val chunkBodyEnd = cursor + chunkSize def result(terminatorLen: Int) = { emit(ParserOutput.EntityChunk(HttpEntity.Chunk(input.slice(cursor, chunkBodyEnd), extension))) - parseChunk(input, chunkBodyEnd + terminatorLen) + parseChunk(input, chunkBodyEnd + terminatorLen, isLastMessage) } byteChar(input, chunkBodyEnd) match { case '\r' if byteChar(input, chunkBodyEnd + 1) == '\n' ⇒ result(2) @@ -177,7 +186,7 @@ private[http] abstract class HttpMessageParser[Output >: ParserOutput.MessageOut try parseSize(offset, 0) catch { - case NotEnoughDataException ⇒ continue(input, offset)(parseChunk) + case NotEnoughDataException ⇒ continue(input, offset)(parseChunk(_, _, isLastMessage)) } } @@ -190,12 +199,12 @@ private[http] abstract class HttpMessageParser[Output >: ParserOutput.MessageOut case 0 ⇒ next(_, 0) case 1 ⇒ throw new IllegalStateException } - null // StateResult is a phantom type + done() } def continue(next: (ByteString, Int) ⇒ StateResult): StateResult = { state = next(_, 0) - null // StateResult is a phantom type + done() } def fail(summary: String): StateResult = fail(summary, "") @@ -204,9 +213,16 @@ private[http] abstract class HttpMessageParser[Output >: ParserOutput.MessageOut def fail(status: StatusCode, summary: String, detail: String = ""): StateResult = fail(status, ErrorInfo(summary, detail)) def fail(status: StatusCode, info: ErrorInfo): StateResult = { emit(ParserOutput.ParseError(status, info)) - null // StateResult is a phantom type + terminate() } + def terminate(): StateResult = { + terminated = true + done() + } + + def done(): StateResult = null // StateResult is a phantom type + def contentType(cth: Option[`Content-Type`]) = cth match { case Some(x) ⇒ x.contentType case None ⇒ ContentTypes.`application/octet-stream` diff --git a/akka-http-core/src/main/scala/akka/http/parsing/HttpRequestParser.scala b/akka-http-core/src/main/scala/akka/http/parsing/HttpRequestParser.scala index 250e82608f..fdab5975cf 100644 --- a/akka-http-core/src/main/scala/akka/http/parsing/HttpRequestParser.scala +++ b/akka-http-core/src/main/scala/akka/http/parsing/HttpRequestParser.scala @@ -131,14 +131,14 @@ private[http] class HttpRequestParser(_settings: ParserSettings, startNewMessage(input, bodyStart + cl) } else { emitRequestStart(defaultEntity(cth, contentLength, materializer)) - parseFixedLengthBody(contentLength)(input, bodyStart) + parseFixedLengthBody(contentLength, closeAfterResponseCompletion)(input, bodyStart) } case Some(te) ⇒ if (te.encodings.size == 1 && te.hasChunked) { if (clh.isEmpty) { emitRequestStart(chunkedEntity(cth, materializer)) - parseChunk(input, bodyStart) + parseChunk(input, bodyStart, closeAfterResponseCompletion) } else fail("A chunked request must not contain a Content-Length header.") } else fail(NotImplemented, s"`$te` is not supported by this server") } diff --git a/akka-http-core/src/main/scala/akka/http/parsing/HttpResponseParser.scala b/akka-http-core/src/main/scala/akka/http/parsing/HttpResponseParser.scala index 33d1a06f74..d529ddf62d 100644 --- a/akka-http-core/src/main/scala/akka/http/parsing/HttpResponseParser.scala +++ b/akka-http-core/src/main/scala/akka/http/parsing/HttpResponseParser.scala @@ -12,24 +12,26 @@ import akka.stream.scaladsl.Flow import akka.util.ByteString import akka.http.model._ import headers._ +import HttpResponseParser.NoMethod /** * INTERNAL API */ private[http] class HttpResponseParser(_settings: ParserSettings, - materializer: FlowMaterializer)(_headerParser: HttpHeaderParser = HttpHeaderParser(_settings)) + materializer: FlowMaterializer, + dequeueRequestMethodForNextResponse: () ⇒ HttpMethod = () ⇒ NoMethod)(_headerParser: HttpHeaderParser = HttpHeaderParser(_settings)) extends HttpMessageParser[ParserOutput.ResponseOutput](_settings, _headerParser) { - import HttpResponseParser.NoMethod - private[this] var requestMethodForCurrentResponse: HttpMethod = NoMethod private[this] var statusCode: StatusCode = StatusCodes.OK - def copyWith(warnOnIllegalHeader: ErrorInfo ⇒ Unit): HttpResponseParser = - new HttpResponseParser(settings, materializer)(headerParser.copyWith(warnOnIllegalHeader)) + def copyWith(warnOnIllegalHeader: ErrorInfo ⇒ Unit, dequeueRequestMethodForNextResponse: () ⇒ HttpMethod): HttpResponseParser = + new HttpResponseParser(settings, materializer, dequeueRequestMethodForNextResponse)(headerParser.copyWith(warnOnIllegalHeader)) - def setRequestMethodForNextResponse(method: HttpMethod): Unit = - requestMethodForCurrentResponse = method + override def startNewMessage(input: ByteString, offset: Int): StateResult = { + requestMethodForCurrentResponse = dequeueRequestMethodForNextResponse() + super.startNewMessage(input, offset) + } def parseMessage(input: ByteString, offset: Int): StateResult = if (requestMethodForCurrentResponse ne NoMethod) { @@ -93,7 +95,7 @@ private[http] class HttpResponseParser(_settings: ParserSettings, startNewMessage(input, bodyStart + cl) } else { emitResponseStart(defaultEntity(cth, contentLength, materializer)) - parseFixedLengthBody(contentLength)(input, bodyStart) + parseFixedLengthBody(contentLength, closeAfterResponseCompletion)(input, bodyStart) } case None ⇒ emitResponseStart { entityParts ⇒ @@ -107,7 +109,7 @@ private[http] class HttpResponseParser(_settings: ParserSettings, if (te.encodings.size == 1 && te.hasChunked) { if (clh.isEmpty) { emitResponseStart(chunkedEntity(cth, materializer)) - parseChunk(input, bodyStart) + parseChunk(input, bodyStart, closeAfterResponseCompletion) } else fail("A chunked request must not contain a Content-Length header.") } else fail(s"`$te` is not supported by this client") } @@ -127,6 +129,6 @@ private[http] class HttpResponseParser(_settings: ParserSettings, /** * INTERNAL API */ -private[parsing] object HttpResponseParser { +private[http] object HttpResponseParser { val NoMethod = HttpMethod.custom("NONE", safe = false, idempotent = false, entityAccepted = false) } \ No newline at end of file diff --git a/akka-http-core/src/main/scala/akka/http/parsing/ParserSettings.scala b/akka-http-core/src/main/scala/akka/http/parsing/ParserSettings.scala index 1e4fe2042f..3560926249 100644 --- a/akka-http-core/src/main/scala/akka/http/parsing/ParserSettings.scala +++ b/akka-http-core/src/main/scala/akka/http/parsing/ParserSettings.scala @@ -20,7 +20,6 @@ final case class ParserSettings( maxChunkSize: Int, uriParsingMode: Uri.ParsingMode, illegalHeaderWarnings: Boolean, - sslSessionInfoHeader: Boolean, headerValueCacheLimits: Map[String, Int]) { require(maxUriLength > 0, "max-uri-length must be > 0") @@ -53,7 +52,6 @@ object ParserSettings extends SettingsCompanion[ParserSettings]("akka.http.parsi c getIntBytes "max-chunk-size", Uri.ParsingMode(c getString "uri-parsing-mode"), c getBoolean "illegal-header-warnings", - c getBoolean "ssl-session-info-header", cacheConfig.entrySet.asScala.map(kvp ⇒ kvp.getKey -> cacheConfig.getInt(kvp.getKey))(collection.breakOut)) } } diff --git a/akka-http-core/src/main/scala/akka/http/rendering/HttpRequestRendererFactory.scala b/akka-http-core/src/main/scala/akka/http/rendering/HttpRequestRendererFactory.scala new file mode 100644 index 0000000000..99b90684e5 --- /dev/null +++ b/akka-http-core/src/main/scala/akka/http/rendering/HttpRequestRendererFactory.scala @@ -0,0 +1,131 @@ +/** + * Copyright (C) 2009-2014 Typesafe Inc. + */ + +package akka.http.rendering + +import java.net.InetSocketAddress +import org.reactivestreams.api.Producer +import scala.annotation.tailrec +import scala.collection.immutable +import akka.event.LoggingAdapter +import akka.util.ByteString +import akka.stream.scaladsl.Flow +import akka.stream.{ FlowMaterializer, Transformer } +import akka.stream.impl.SynchronousProducerFromIterable +import akka.http.model._ +import akka.http.util._ +import RenderSupport._ +import headers._ + +/** + * INTERNAL API + */ +private[http] class HttpRequestRendererFactory(userAgentHeader: Option[headers.`User-Agent`], + requestHeaderSizeHint: Int, + materializer: FlowMaterializer, + log: LoggingAdapter) { + + def newRenderer: HttpRequestRenderer = new HttpRequestRenderer + + final class HttpRequestRenderer extends Transformer[RequestRenderingContext, Producer[ByteString]] { + + def onNext(ctx: RequestRenderingContext): immutable.Seq[Producer[ByteString]] = { + val r = new ByteStringRendering(requestHeaderSizeHint) + import ctx.request._ + + def renderRequestLine(): Unit = { + r ~~ method ~~ ' ' + val rawRequestUriRendered = headers.exists { + case `Raw-Request-URI`(rawUri) ⇒ + r ~~ rawUri; true + case _ ⇒ false + } + if (!rawRequestUriRendered) UriRendering.renderUriWithoutFragment(r, uri, UTF8) + r ~~ ' ' ~~ protocol ~~ CrLf + } + + def render(h: HttpHeader) = r ~~ h ~~ CrLf + + @tailrec def renderHeaders(remaining: List[HttpHeader], hostHeaderSeen: Boolean = false, + userAgentSeen: Boolean = false): Unit = + remaining match { + case head :: tail ⇒ head match { + case x: `Content-Length` ⇒ + suppressionWarning(log, x, "explicit `Content-Length` header is not allowed. Use the appropriate HttpEntity subtype.") + renderHeaders(tail, hostHeaderSeen, userAgentSeen) + + case x: `Content-Type` ⇒ + suppressionWarning(log, x, "explicit `Content-Type` header is not allowed. Set `HttpRequest.entity.contentType` instead.") + renderHeaders(tail, hostHeaderSeen, userAgentSeen) + + case `Transfer-Encoding`(_) ⇒ + suppressionWarning(log, head) + renderHeaders(tail, hostHeaderSeen, userAgentSeen) + + case x: `Host` ⇒ + render(x) + renderHeaders(tail, hostHeaderSeen = true, userAgentSeen) + + case x: `User-Agent` ⇒ + render(x) + renderHeaders(tail, hostHeaderSeen, userAgentSeen = true) + + case x: `Raw-Request-URI` ⇒ // we never render this header + renderHeaders(tail, hostHeaderSeen, userAgentSeen) + + case x: RawHeader if x.lowercaseName == "content-type" || + x.lowercaseName == "content-length" || + x.lowercaseName == "transfer-encoding" || + x.lowercaseName == "host" || + x.lowercaseName == "user-agent" ⇒ + suppressionWarning(log, x, "illegal RawHeader") + renderHeaders(tail, hostHeaderSeen, userAgentSeen) + + case x ⇒ + render(x) + renderHeaders(tail, hostHeaderSeen, userAgentSeen) + } + + case Nil ⇒ + if (!hostHeaderSeen) r ~~ Host(ctx.serverAddress) ~~ CrLf + if (!userAgentSeen && userAgentHeader.isDefined) r ~~ userAgentHeader.get ~~ CrLf + } + + def renderContentLength(contentLength: Long): Unit = { + if (method.isEntityAccepted) r ~~ `Content-Length` ~~ contentLength ~~ CrLf + r ~~ CrLf + } + + def completeRequestRendering(): immutable.Seq[Producer[ByteString]] = + entity match { + case HttpEntity.Strict(contentType, data) ⇒ + renderContentLength(data.length) + SynchronousProducerFromIterable(r.get :: data :: Nil) :: Nil + + case HttpEntity.Default(contentType, contentLength, data) ⇒ + renderContentLength(contentLength) + renderByteStrings(r, + Flow(data).transform(new CheckContentLengthTransformer(contentLength)).toProducer(materializer), + materializer) + + case HttpEntity.Chunked(contentType, chunks) ⇒ + r ~~ `Transfer-Encoding` ~~ Chunked ~~ CrLf ~~ CrLf + renderByteStrings(r, Flow(chunks).transform(new ChunkTransformer).toProducer(materializer), materializer) + } + + renderRequestLine() + renderHeaders(headers.toList) + renderEntityContentType(r, entity) + if (entity.isKnownEmpty) { + renderContentLength(0) + SynchronousProducerFromIterable(r.get :: Nil) :: Nil + } else completeRequestRendering() + } + } +} + +/** + * INTERNAL API + */ +private[http] final case class RequestRenderingContext(request: HttpRequest, serverAddress: InetSocketAddress) \ No newline at end of file diff --git a/akka-http-core/src/main/scala/akka/http/rendering/HttpResponseRendererFactory.scala b/akka-http-core/src/main/scala/akka/http/rendering/HttpResponseRendererFactory.scala index 94d2b343bc..0a3e59b4f1 100644 --- a/akka-http-core/src/main/scala/akka/http/rendering/HttpResponseRendererFactory.scala +++ b/akka-http-core/src/main/scala/akka/http/rendering/HttpResponseRendererFactory.scala @@ -10,8 +10,8 @@ import scala.collection.immutable import akka.event.LoggingAdapter import akka.util.ByteString import akka.stream.scaladsl.Flow -import akka.stream.{ FlowMaterializer, Transformer } import akka.stream.impl.SynchronousProducerFromIterable +import akka.stream.{ FlowMaterializer, Transformer } import akka.http.model._ import akka.http.util._ import RenderSupport._ @@ -26,24 +26,26 @@ private[http] class HttpResponseRendererFactory(serverHeader: Option[headers.Ser materializer: FlowMaterializer, log: LoggingAdapter) { - private val serverHeaderPlusDateColonSP: Array[Byte] = + private val renderDefaultServerHeader: Rendering ⇒ Unit = serverHeader match { - case None ⇒ "Date: ".getAsciiBytes - case Some(header) ⇒ (new ByteArrayRendering(64) ~~ header ~~ Rendering.CrLf ~~ "Date: ").get + case Some(h) ⇒ + val bytes = (new ByteArrayRendering(32) ~~ h ~~ CrLf).get + _ ~~ bytes + case None ⇒ _ ⇒ () } - // as an optimization we cache the ServerAndDateHeader of the last second here - @volatile private[this] var cachedServerAndDateHeader: (Long, Array[Byte]) = (0L, null) + // as an optimization we cache the Date header of the last second here + @volatile private[this] var cachedDateHeader: (Long, Array[Byte]) = (0L, null) - private def serverAndDateHeader: Array[Byte] = { - var (cachedSeconds, cachedBytes) = cachedServerAndDateHeader + private def dateHeader: Array[Byte] = { + var (cachedSeconds, cachedBytes) = cachedDateHeader val now = System.currentTimeMillis - if (now / 1000 != cachedSeconds) { + if (now - 1000 > cachedSeconds) { cachedSeconds = now / 1000 - val r = new ByteArrayRendering(serverHeaderPlusDateColonSP.length + 31) - dateTime(now).renderRfc1123DateTimeString(r ~~ serverHeaderPlusDateColonSP) ~~ CrLf + val r = new ByteArrayRendering(48) + dateTime(now).renderRfc1123DateTimeString(r ~~ headers.Date) ~~ CrLf cachedBytes = r.get - cachedServerAndDateHeader = cachedSeconds -> cachedBytes + cachedDateHeader = cachedSeconds -> cachedBytes } cachedBytes } @@ -52,7 +54,7 @@ private[http] class HttpResponseRendererFactory(serverHeader: Option[headers.Ser def newRenderer: HttpResponseRenderer = new HttpResponseRenderer - class HttpResponseRenderer extends Transformer[ResponseRenderingContext, Producer[ByteString]] { + final class HttpResponseRenderer extends Transformer[ResponseRenderingContext, Producer[ByteString]] { private[this] var close = false // signals whether the connection is to be closed after the current response override def isComplete = close @@ -61,32 +63,36 @@ private[http] class HttpResponseRendererFactory(serverHeader: Option[headers.Ser val r = new ByteStringRendering(responseHeaderSizeHint) import ctx.response._ - if (status eq StatusCodes.OK) r ~~ DefaultStatusLine else r ~~ StatusLineStart ~~ status ~~ CrLf - r ~~ serverAndDateHeader + val noEntity = entity.isKnownEmpty || ctx.requestMethod == HttpMethods.HEAD + + def renderStatusLine(): Unit = + if (status eq StatusCodes.OK) r ~~ DefaultStatusLine else r ~~ StatusLineStart ~~ status ~~ CrLf + + def render(h: HttpHeader) = r ~~ h ~~ CrLf @tailrec def renderHeaders(remaining: List[HttpHeader], alwaysClose: Boolean = false, - connHeader: Connection = null): Unit = { - def render(h: HttpHeader) = r ~~ h ~~ CrLf - def suppressionWarning(h: HttpHeader, msg: String = "the akka-http-core layer sets this header automatically!"): Unit = - log.warning("Explicitly set response header '{}' is ignored, {}", h, msg) - + connHeader: Connection = null, serverHeaderSeen: Boolean = false): Unit = remaining match { case head :: tail ⇒ head match { case x: `Content-Length` ⇒ - suppressionWarning(x, "explicit `Content-Length` header is not allowed. Use the appropriate HttpEntity subtype.") - renderHeaders(tail, alwaysClose, connHeader) + suppressionWarning(log, x, "explicit `Content-Length` header is not allowed. Use the appropriate HttpEntity subtype.") + renderHeaders(tail, alwaysClose, connHeader, serverHeaderSeen) case x: `Content-Type` ⇒ - suppressionWarning(x, "explicit `Content-Type` header is not allowed. Set `HttpResponse.entity.contentType`, instead.") - renderHeaders(tail, alwaysClose, connHeader) + suppressionWarning(log, x, "explicit `Content-Type` header is not allowed. Set `HttpResponse.entity.contentType` instead.") + renderHeaders(tail, alwaysClose, connHeader, serverHeaderSeen) - case `Transfer-Encoding`(_) | Date(_) | Server(_) ⇒ - suppressionWarning(head) - renderHeaders(tail, alwaysClose, connHeader) + case `Transfer-Encoding`(_) | Date(_) ⇒ + suppressionWarning(log, head) + renderHeaders(tail, alwaysClose, connHeader, serverHeaderSeen) case x: `Connection` ⇒ val connectionHeader = if (connHeader eq null) x else Connection(x.tokens ++ connHeader.tokens) - renderHeaders(tail, alwaysClose, connectionHeader) + renderHeaders(tail, alwaysClose, connectionHeader, serverHeaderSeen) + + case x: `Server` ⇒ + render(x) + renderHeaders(tail, alwaysClose, connHeader, serverHeaderSeen = true) case x: RawHeader if x.lowercaseName == "content-type" || x.lowercaseName == "content-length" || @@ -94,15 +100,17 @@ private[http] class HttpResponseRendererFactory(serverHeader: Option[headers.Ser x.lowercaseName == "date" || x.lowercaseName == "server" || x.lowercaseName == "connection" ⇒ - suppressionWarning(x, "illegal RawHeader") - renderHeaders(tail, alwaysClose, connHeader) + suppressionWarning(log, x, "illegal RawHeader") + renderHeaders(tail, alwaysClose, connHeader, serverHeaderSeen) case x ⇒ render(x) - renderHeaders(tail, alwaysClose, connHeader) + renderHeaders(tail, alwaysClose, connHeader, serverHeaderSeen) } case Nil ⇒ + if (!serverHeaderSeen) renderDefaultServerHeader(r) + r ~~ dateHeader close = alwaysClose || ctx.closeAfterResponseCompletion || // request wants to close (connHeader != null && connHeader.hasClose) // application wants to close @@ -112,80 +120,46 @@ private[http] class HttpResponseRendererFactory(serverHeader: Option[headers.Ser case _ ⇒ // no need for rendering } } - } - def renderByteStrings(entityBytes: ⇒ Producer[ByteString]): immutable.Seq[Producer[ByteString]] = { - val messageStart = SynchronousProducerFromIterable(r.get :: Nil) - val messageBytes = - if (!entity.isKnownEmpty && ctx.requestMethod != HttpMethods.HEAD) - Flow(messageStart).concat(entityBytes).toProducer(materializer) - else messageStart - messageBytes :: Nil - } + def byteStrings(entityBytes: ⇒ Producer[ByteString]): immutable.Seq[Producer[ByteString]] = + renderByteStrings(r, entityBytes, materializer, skipEntity = noEntity) - def renderContentType(entity: HttpEntity): Unit = - if (!entity.isKnownEmpty && entity.contentType != ContentTypes.NoContentType) - r ~~ `Content-Type` ~~ entity.contentType ~~ CrLf - - def renderEntity(entity: HttpEntity): immutable.Seq[Producer[ByteString]] = + def completeResponseRendering(entity: HttpEntity): immutable.Seq[Producer[ByteString]] = entity match { case HttpEntity.Strict(contentType, data) ⇒ renderHeaders(headers.toList) - renderContentType(entity) + renderEntityContentType(r, entity) r ~~ `Content-Length` ~~ data.length ~~ CrLf ~~ CrLf - if (!entity.isKnownEmpty && ctx.requestMethod != HttpMethods.HEAD) r ~~ data - SynchronousProducerFromIterable(r.get :: Nil) :: Nil + val entityBytes = if (noEntity) Nil else data :: Nil + SynchronousProducerFromIterable(r.get :: entityBytes) :: Nil case HttpEntity.Default(contentType, contentLength, data) ⇒ renderHeaders(headers.toList) - renderContentType(entity) + renderEntityContentType(r, entity) r ~~ `Content-Length` ~~ contentLength ~~ CrLf ~~ CrLf - renderByteStrings(Flow(data).transform(new CheckContentLengthTransformer(contentLength)).toProducer(materializer)) + byteStrings(Flow(data).transform(new CheckContentLengthTransformer(contentLength)).toProducer(materializer)) case HttpEntity.CloseDelimited(contentType, data) ⇒ renderHeaders(headers.toList, alwaysClose = true) - renderContentType(entity) + renderEntityContentType(r, entity) r ~~ CrLf - renderByteStrings(data) + byteStrings(data) case HttpEntity.Chunked(contentType, chunks) ⇒ if (ctx.requestProtocol == `HTTP/1.0`) - renderEntity(HttpEntity.CloseDelimited(contentType, Flow(chunks).map(_.data).toProducer(materializer))) + completeResponseRendering(HttpEntity.CloseDelimited(contentType, Flow(chunks).map(_.data).toProducer(materializer))) else { renderHeaders(headers.toList) - renderContentType(entity) - if (!entity.isKnownEmpty) r ~~ `Transfer-Encoding` ~~ Chunked ~~ CrLf + renderEntityContentType(r, entity) + if (!entity.isKnownEmpty || ctx.requestMethod == HttpMethods.HEAD) + r ~~ `Transfer-Encoding` ~~ Chunked ~~ CrLf r ~~ CrLf - renderByteStrings(Flow(chunks).transform(new ChunkTransformer).toProducer(materializer)) + byteStrings(Flow(chunks).transform(new ChunkTransformer).toProducer(materializer)) } } - renderEntity(entity) - } - } - - class ChunkTransformer extends Transformer[HttpEntity.ChunkStreamPart, ByteString] { - var lastChunkSeen = false - def onNext(chunk: HttpEntity.ChunkStreamPart): immutable.Seq[ByteString] = { - if (chunk.isLastChunk) lastChunkSeen = true - renderChunk(chunk) :: Nil - } - override def isComplete = lastChunkSeen - override def onTermination(e: Option[Throwable]) = if (lastChunkSeen) Nil else defaultLastChunkBytes :: Nil - } - class CheckContentLengthTransformer(length: Long) extends Transformer[ByteString, ByteString] { - var sent = 0L - def onNext(elem: ByteString): immutable.Seq[ByteString] = { - sent += elem.length - if (sent > length) - throw new InvalidContentLengthException(s"Response had declared Content-Length $length but entity chunk stream amounts to more bytes") - elem :: Nil - } - - override def onTermination(e: Option[Throwable]): immutable.Seq[ByteString] = { - if (sent < length) - throw new InvalidContentLengthException(s"Response had declared Content-Length $length but entity chunk stream amounts to ${length - sent} bytes less") - Nil + renderStatusLine() + completeResponseRendering(entity) } } } diff --git a/akka-http-core/src/main/scala/akka/http/rendering/RenderSupport.scala b/akka-http-core/src/main/scala/akka/http/rendering/RenderSupport.scala index 2ee123ad96..eb972f7ab2 100644 --- a/akka-http-core/src/main/scala/akka/http/rendering/RenderSupport.scala +++ b/akka-http-core/src/main/scala/akka/http/rendering/RenderSupport.scala @@ -4,10 +4,16 @@ package akka.http.rendering +import org.reactivestreams.api.Producer +import scala.collection.immutable import akka.parboiled2.CharUtils -import akka.http.model.{ HttpEntity, HttpHeader } -import akka.http.util._ import akka.util.ByteString +import akka.event.LoggingAdapter +import akka.stream.impl.SynchronousProducerFromIterable +import akka.stream.scaladsl.Flow +import akka.stream.{ FlowMaterializer, Transformer } +import akka.http.model._ +import akka.http.util._ /** * INTERNAL API @@ -25,7 +31,46 @@ private object RenderSupport { val defaultLastChunkBytes: ByteString = renderChunk(HttpEntity.LastChunk) - def renderChunk(chunk: HttpEntity.ChunkStreamPart): ByteString = { + def renderEntityContentType(r: Rendering, entity: HttpEntity): Unit = + if (entity.contentType != ContentTypes.NoContentType) + r ~~ headers.`Content-Type` ~~ entity.contentType ~~ CrLf + + def renderByteStrings(r: ByteStringRendering, entityBytes: ⇒ Producer[ByteString], materializer: FlowMaterializer, + skipEntity: Boolean = false): immutable.Seq[Producer[ByteString]] = { + val messageStart = SynchronousProducerFromIterable(r.get :: Nil) + val messageBytes = + if (!skipEntity) Flow(messageStart).concat(entityBytes).toProducer(materializer) + else messageStart + messageBytes :: Nil + } + + class ChunkTransformer extends Transformer[HttpEntity.ChunkStreamPart, ByteString] { + var lastChunkSeen = false + def onNext(chunk: HttpEntity.ChunkStreamPart): immutable.Seq[ByteString] = { + if (chunk.isLastChunk) lastChunkSeen = true + renderChunk(chunk) :: Nil + } + override def isComplete = lastChunkSeen + override def onTermination(e: Option[Throwable]) = if (lastChunkSeen) Nil else defaultLastChunkBytes :: Nil + } + + class CheckContentLengthTransformer(length: Long) extends Transformer[ByteString, ByteString] { + var sent = 0L + def onNext(elem: ByteString): immutable.Seq[ByteString] = { + sent += elem.length + if (sent > length) + throw new InvalidContentLengthException(s"HTTP message had declared Content-Length $length but entity chunk stream amounts to more bytes") + elem :: Nil + } + + override def onTermination(e: Option[Throwable]): immutable.Seq[ByteString] = { + if (sent < length) + throw new InvalidContentLengthException(s"HTTP message had declared Content-Length $length but entity chunk stream amounts to ${length - sent} bytes less") + Nil + } + } + + private def renderChunk(chunk: HttpEntity.ChunkStreamPart): ByteString = { import chunk._ val renderedSize = // buffer space required for rendering (without trailer) CharUtils.numberOfHexDigits(data.length) + @@ -44,4 +89,8 @@ private object RenderSupport { r ~~ CrLf r.get } + + def suppressionWarning(log: LoggingAdapter, h: HttpHeader, + msg: String = "the akka-http-core layer sets this header automatically!"): Unit = + log.warning("Explicitly set HTTP header '{}' is ignored, {}", h, msg) } diff --git a/akka-http-core/src/main/scala/akka/http/server/HttpListener.scala b/akka-http-core/src/main/scala/akka/http/server/HttpListener.scala deleted file mode 100644 index 27c0cf4583..0000000000 --- a/akka-http-core/src/main/scala/akka/http/server/HttpListener.scala +++ /dev/null @@ -1,134 +0,0 @@ -/** - * Copyright (C) 2009-2014 Typesafe Inc. - */ - -package akka.http.server - -import scala.concurrent.duration._ -import akka.io.{ Tcp, IO } -import akka.stream.io.{ TcpListenStreamActor, StreamTcp } -import akka.stream.{ Transformer, FlowMaterializer } -import akka.stream.scaladsl.Flow -import akka.http.util.Timestamp -import akka.http.{ Http, HttpExt } -import akka.actor._ - -/** - * INTERNAL API - */ -private[http] class HttpListener(bindCommander: ActorRef, - bind: Http.Bind, - httpSettings: HttpExt#Settings) extends Actor with ActorLogging { - import HttpListener._ - import bind._ - - private val settings = bind.serverSettings getOrElse ServerSettings(context.system) - - log.debug("Binding to {}", endpoint) - - IO(StreamTcp)(context.system) ! StreamTcp.Bind(materializerSettings, endpoint, backlog, options) - - context.setReceiveTimeout(settings.bindTimeout) - - val httpServerPipeline = new HttpServerPipeline(settings, FlowMaterializer(materializerSettings), log) - - // we cannot sensibly recover from crashes - override def supervisorStrategy = SupervisorStrategy.stoppingStrategy - - def receive = binding - - def binding: Receive = { - case StreamTcp.TcpServerBinding(localAddress, connectionStream) ⇒ - log.info("Bound to {}", endpoint) - val materializer = FlowMaterializer(materializerSettings) - val httpConnectionStream = Flow(connectionStream) - .map(httpServerPipeline) - .transform { - new Transformer[Http.IncomingConnection, Http.IncomingConnection] { - def onNext(element: Http.IncomingConnection) = element :: Nil - override def cleanup() = shutdown(gracePeriod = Duration.Zero) - } - }.toProducer(materializer) - bindCommander ! Http.ServerBinding(localAddress, httpConnectionStream) - context.setReceiveTimeout(Duration.Undefined) - context.become(connected(sender())) - - case Status.Failure(_: TcpListenStreamActor.TcpListenStreamException) ⇒ - log.warning("Bind to {} failed", endpoint) - bindCommander ! Status.Failure(Http.BindFailedException) - context.stop(self) - - case ReceiveTimeout ⇒ - log.warning("Bind to {} failed, timeout {} expired", endpoint, settings.bindTimeout) - bindCommander ! Status.Failure(Http.BindFailedException) - context.stop(self) - - case Http.Unbind(_) ⇒ // no children possible, so no reason to note the timeout - log.info("Bind to {} aborted", endpoint) - bindCommander ! Status.Failure(Http.BindFailedException) - context.become(bindingAborted(Set(sender()))) - } - - /** Waiting for the bind to execute to close it down instantly afterwards */ - def bindingAborted(unbindCommanders: Set[ActorRef]): Receive = { - case _: StreamTcp.TcpServerBinding ⇒ - unbind(sender(), unbindCommanders, Duration.Zero) - - case Status.Failure(_: TcpListenStreamActor.TcpListenStreamException) ⇒ - unbindCommanders foreach (_ ! Http.Unbound) - context.stop(self) - - case ReceiveTimeout ⇒ - unbindCommanders foreach (_ ! Http.Unbound) - context.stop(self) - - case Http.Unbind(_) ⇒ context.become(bindingAborted(unbindCommanders + sender())) - } - - def connected(tcpListener: ActorRef): Receive = { - case Http.Unbind(timeout) ⇒ unbind(tcpListener, Set(sender()), timeout) - } - - def unbind(tcpListener: ActorRef, unbindCommanders: Set[ActorRef], timeout: Duration): Unit = { - tcpListener ! Tcp.Unbind - context.setReceiveTimeout(settings.unbindTimeout) - context.become(unbinding(unbindCommanders, timeout)) - } - - def unbinding(commanders: Set[ActorRef], gracePeriod: Duration): Receive = { - case Tcp.Unbound ⇒ - log.info("Unbound from {}", endpoint) - commanders foreach (_ ! Http.Unbound) - shutdown(gracePeriod) - - case ReceiveTimeout ⇒ - log.warning("Unbinding from {} failed, timeout {} expired, stopping", endpoint, settings.unbindTimeout) - commanders foreach (_ ! Status.Failure(Http.UnbindFailedException)) - context.stop(self) - - case Http.Unbind(_) ⇒ - // the first Unbind we received has precedence over ones potentially sent later - context.become(unbinding(commanders + sender(), gracePeriod)) - } - - def shutdown(gracePeriod: Duration): Unit = - if (gracePeriod == Duration.Zero || context.children.nonEmpty) { - context.setReceiveTimeout(Duration.Undefined) - self ! Tick - context.become(inShutdownGracePeriod(Timestamp.now + gracePeriod)) - } else context.stop(self) - - /** Wait for a last grace period to expire before shutting us (and our children down) */ - def inShutdownGracePeriod(timeout: Timestamp): Receive = { - case Tick ⇒ - if (timeout.isPast || context.children.isEmpty) context.stop(self) - else context.system.scheduler.scheduleOnce(1.second, self, Tick)(context.dispatcher) - } -} - -private[http] object HttpListener { - def props(bindCommander: ActorRef, bind: Http.Bind, httpSettings: HttpExt#Settings) = - Props(new HttpListener(bindCommander, bind, httpSettings)) withDispatcher httpSettings.ListenerDispatcher - - private case object Tick -} \ No newline at end of file diff --git a/akka-http-core/src/main/scala/akka/http/server/HttpServerPipeline.scala b/akka-http-core/src/main/scala/akka/http/server/HttpServerPipeline.scala index 340bcedb5d..1f7dd2272f 100644 --- a/akka-http-core/src/main/scala/akka/http/server/HttpServerPipeline.scala +++ b/akka-http-core/src/main/scala/akka/http/server/HttpServerPipeline.scala @@ -6,7 +6,6 @@ package akka.http.server import org.reactivestreams.api.Producer import akka.event.LoggingAdapter -import akka.util.ByteString import akka.stream.io.StreamTcp import akka.stream.{ FlattenStrategy, Transformer, FlowMaterializer } import akka.stream.scaladsl.{ Flow, Duct } @@ -15,6 +14,8 @@ import akka.http.rendering.{ ResponseRenderingContext, HttpResponseRendererFacto import akka.http.model.{ StatusCode, ErrorInfo, HttpRequest, HttpResponse } import akka.http.parsing.ParserOutput._ import akka.http.Http +import akka.http.util._ +import akka.http.model.headers.Host /** * INTERNAL API @@ -23,7 +24,6 @@ private[http] class HttpServerPipeline(settings: ServerSettings, materializer: FlowMaterializer, log: LoggingAdapter) extends (StreamTcp.IncomingTcpConnection ⇒ Http.IncomingConnection) { - import HttpServerPipeline._ val rootParser = new HttpRequestParser(settings.parserSettings, settings.rawRequestUriHeader, materializer)() val warnOnIllegalHeader: ErrorInfo ⇒ Unit = errorInfo ⇒ @@ -45,7 +45,11 @@ private[http] class HttpServerPipeline(settings: ServerSettings, .splitWhen(_.isInstanceOf[MessageStart]) .headAndTail(materializer) .tee(applicationBypassConsumer) - .collect { case (x: RequestStart, entityParts) ⇒ HttpServerPipeline.constructRequest(x, entityParts) } + .collect { + case (RequestStart(method, uri, protocol, headers, createEntity, _), entityParts) ⇒ + val effectiveUri = HttpRequest.effectiveUri(uri, headers, securedConnection = false, settings.defaultHostHeader) + HttpRequest(method, effectiveUri, headers, createEntity(entityParts), protocol) + } .toProducer(materializer) val responseConsumer = @@ -54,12 +58,8 @@ private[http] class HttpServerPipeline(settings: ServerSettings, .transform(applyApplicationBypass) .transform(responseRendererFactory.newRenderer) .flatten(FlattenStrategy.concat) - .transform { - new Transformer[ByteString, ByteString] { - def onNext(element: ByteString) = element :: Nil - override def onError(cause: Throwable): Unit = log.error(cause, "Response stream error") - } - }.produceTo(materializer, tcpConn.outputStream) + .transform(errorLogger(log, "Outgoing response stream error")) + .produceTo(materializer, tcpConn.outputStream) Http.IncomingConnection(tcpConn.remoteAddress, requestProducer, responseConsumer) } @@ -109,18 +109,4 @@ private[http] class HttpServerPipeline(settings: ServerSettings, ResponseRenderingContext(HttpResponse(status, entity = msg), closeAfterResponseCompletion = true) } } -} - -private[http] object HttpServerPipeline { - def constructRequest(requestStart: RequestStart, entityParts: Producer[RequestOutput]): HttpRequest = { - import requestStart._ - HttpRequest(method, uri, headers, createEntity(entityParts), protocol) - } - - implicit class FlowWithHeadAndTail[T](val underlying: Flow[Producer[T]]) { - def headAndTail(materializer: FlowMaterializer): Flow[(T, Producer[T])] = - underlying.map { p ⇒ - Flow(p).prefixAndTail(1).map { case (prefix, tail) ⇒ (prefix.head, tail) }.toProducer(materializer) - }.flatten(FlattenStrategy.Concat()) - } } \ No newline at end of file diff --git a/akka-http-core/src/main/scala/akka/http/server/ServerSettings.scala b/akka-http-core/src/main/scala/akka/http/server/ServerSettings.scala index bc95bdd974..b14db09deb 100644 --- a/akka-http-core/src/main/scala/akka/http/server/ServerSettings.scala +++ b/akka-http-core/src/main/scala/akka/http/server/ServerSettings.scala @@ -7,6 +7,7 @@ package akka.http.server import language.implicitConversions import com.typesafe.config.Config import scala.concurrent.duration._ +import akka.actor.ActorRefFactory import akka.http.parsing.ParserSettings import akka.http.model.parser.HeaderParser import akka.http.model.headers.{ Server, Host, RawHeader } @@ -15,64 +16,35 @@ import akka.ConfigurationException final case class ServerSettings( serverHeader: Option[Server], - sslEncryption: Boolean, - pipeliningLimit: Int, timeouts: ServerSettings.Timeouts, - timeoutHandler: String, - reapingCycle: Duration, remoteAddressHeader: Boolean, rawRequestUriHeader: Boolean, transparentHeadRequests: Boolean, verboseErrorMessages: Boolean, responseHeaderSizeHint: Int, - maxEncryptionChunkSize: Int, defaultHostHeader: Host, parserSettings: ParserSettings) { - require(reapingCycle >= Duration.Zero, "reapingCycle must be > 0 or 'infinite'") - require(0 <= pipeliningLimit && pipeliningLimit <= 128, "pipelining-limit must be >= 0 and <= 128") require(0 <= responseHeaderSizeHint, "response-size-hint must be > 0") - require(0 < maxEncryptionChunkSize, "max-encryption-chunk-size must be > 0") } object ServerSettings extends SettingsCompanion[ServerSettings]("akka.http.server") { final case class Timeouts(idleTimeout: Duration, - requestTimeout: Duration, - timeoutTimeout: Duration, - bindTimeout: Duration, - unbindTimeout: Duration, - parseErrorAbortTimeout: Duration) { - require(idleTimeout >= Duration.Zero, "idleTimeout must be > 0 or 'infinite'") - require(requestTimeout >= Duration.Zero, "requestTimeout must be > 0 or 'infinite'") - require(timeoutTimeout >= Duration.Zero, "timeoutTimeout must be > 0 or 'infinite'") - require(bindTimeout >= Duration.Zero, "bindTimeout must be > 0 or 'infinite'") - require(unbindTimeout >= Duration.Zero, "unbindTimeout must be > 0 or 'infinite'") - require(!requestTimeout.isFinite || idleTimeout > requestTimeout, - "idle-timeout must be > request-timeout (if the latter is not 'infinite')") - require(!idleTimeout.isFinite || idleTimeout > 1.second, // the current implementation is not fit for - "an idle-timeout < 1 second is not supported") // very short idle-timeout settings + bindTimeout: FiniteDuration) { + require(bindTimeout >= Duration.Zero, "bindTimeout must be > 0") } implicit def timeoutsShortcut(s: ServerSettings): Timeouts = s.timeouts def fromSubConfig(c: Config) = apply( c.getString("server-header").toOption.map(Server(_)), - c getBoolean "ssl-encryption", - c.getString("pipelining-limit") match { case "disabled" ⇒ 0; case _ ⇒ c getInt "pipelining-limit" }, Timeouts( c getPotentiallyInfiniteDuration "idle-timeout", - c getPotentiallyInfiniteDuration "request-timeout", - c getPotentiallyInfiniteDuration "timeout-timeout", - c getPotentiallyInfiniteDuration "bind-timeout", - c getPotentiallyInfiniteDuration "unbind-timeout", - c getPotentiallyInfiniteDuration "parse-error-abort-timeout"), - c getString "timeout-handler", - c getPotentiallyInfiniteDuration "reaping-cycle", + c getFiniteDuration "bind-timeout"), c getBoolean "remote-address-header", c getBoolean "raw-request-uri-header", c getBoolean "transparent-head-requests", c getBoolean "verbose-error-messages", c getIntBytes "response-header-size-hint", - c getIntBytes "max-encryption-chunk-size", defaultHostHeader = HeaderParser.parseHeader(RawHeader("Host", c getString "default-host-header")) match { case Right(x: Host) ⇒ x @@ -80,5 +52,8 @@ object ServerSettings extends SettingsCompanion[ServerSettings]("akka.http.serve case Right(_) ⇒ throw new IllegalStateException }, ParserSettings fromSubConfig c.getConfig("parsing")) + + def apply(optionalSettings: Option[ServerSettings])(implicit actorRefFactory: ActorRefFactory): ServerSettings = + optionalSettings getOrElse apply(actorSystem) } diff --git a/akka-http-core/src/main/scala/akka/http/util/EnhancedConfig.scala b/akka-http-core/src/main/scala/akka/http/util/EnhancedConfig.scala index 5b6e0d3571..41e66aaf6f 100644 --- a/akka-http-core/src/main/scala/akka/http/util/EnhancedConfig.scala +++ b/akka-http-core/src/main/scala/akka/http/util/EnhancedConfig.scala @@ -4,7 +4,7 @@ package akka.http.util -import scala.concurrent.duration.Duration +import scala.concurrent.duration.{ FiniteDuration, Duration } import com.typesafe.config.Config import akka.ConfigurationException @@ -18,6 +18,11 @@ private[http] class EnhancedConfig(val underlying: Config) extends AnyVal { case x ⇒ Duration(x) } + def getFiniteDuration(path: String): FiniteDuration = Duration(underlying.getString(path)) match { + case x: FiniteDuration ⇒ x + case _ ⇒ throw new ConfigurationException(s"Config setting '$path' must be a finite duration") + } + def getPossiblyInfiniteInt(path: String): Int = underlying.getString(path) match { case "infinite" ⇒ Int.MaxValue case x ⇒ underlying.getInt(path) diff --git a/akka-http-core/src/main/scala/akka/http/util/Rendering.scala b/akka-http-core/src/main/scala/akka/http/util/Rendering.scala index d7d3e1c75a..0fd6c0385b 100644 --- a/akka-http-core/src/main/scala/akka/http/util/Rendering.scala +++ b/akka-http-core/src/main/scala/akka/http/util/Rendering.scala @@ -132,7 +132,7 @@ private[http] object Renderer { /** * INTERNAL API * - * The interface for a rendering sink. May be implemented with different backing stores. + * The interface for a rendering sink. Implemented for several serialization targets. */ private[http] trait Rendering { def ~~(ch: Char): this.type diff --git a/akka-http-core/src/main/scala/akka/http/util/SettingsCompanion.scala b/akka-http-core/src/main/scala/akka/http/util/SettingsCompanion.scala index bc7f3fe1d1..83ac16b4c1 100644 --- a/akka-http-core/src/main/scala/akka/http/util/SettingsCompanion.scala +++ b/akka-http-core/src/main/scala/akka/http/util/SettingsCompanion.scala @@ -49,7 +49,7 @@ private[http] abstract class SettingsCompanion[T](prefix: String) { object SettingsCompanion { lazy val configAdditions: Config = { val localHostName = - try InetAddress.getLocalHost.getHostName + try InetAddress.getLocalHost.getHostName // TODO: upgrade to `getHostString` once we are on JDK7 catch { case NonFatal(_) ⇒ "" } ConfigFactory.parseMap(Map("akka.http.hostname" -> localHostName).asJava) } diff --git a/akka-http-core/src/main/scala/akka/http/util/package.scala b/akka-http-core/src/main/scala/akka/http/util/package.scala index 6a3e1a766f..bbdf9084d2 100644 --- a/akka-http-core/src/main/scala/akka/http/util/package.scala +++ b/akka-http-core/src/main/scala/akka/http/util/package.scala @@ -5,10 +5,19 @@ package akka.http import language.implicitConversions +import java.net.InetSocketAddress +import java.nio.channels.ServerSocketChannel +import java.nio.charset.Charset import com.typesafe.config.Config +import org.reactivestreams.api.Producer +import akka.event.LoggingAdapter +import akka.util.ByteString import akka.actor.{ ActorRefFactory, ActorContext, ActorSystem } +import akka.stream.scaladsl.Flow +import akka.stream.{ Transformer, FlattenStrategy, FlowMaterializer } package object util { + private[http] val UTF8 = Charset.forName("UTF8") private[http] def actorSystem(implicit refFactory: ActorRefFactory): ActorSystem = refFactory match { @@ -20,5 +29,34 @@ package object util { private[http] implicit def enhanceByteArray(array: Array[Byte]): EnhancedByteArray = new EnhancedByteArray(array) private[http] implicit def enhanceConfig(config: Config): EnhancedConfig = new EnhancedConfig(config) private[http] implicit def enhanceString_(s: String): EnhancedString = new EnhancedString(s) + + private[http] implicit class FlowWithHeadAndTail[T](val underlying: Flow[Producer[T]]) extends AnyVal { + def headAndTail(materializer: FlowMaterializer): Flow[(T, Producer[T])] = + underlying.map { p ⇒ + Flow(p).prefixAndTail(1).map { case (prefix, tail) ⇒ (prefix.head, tail) }.toProducer(materializer) + }.flatten(FlattenStrategy.Concat()) + } + + private[http] implicit class FlowWithPrintEvent[T](val underlying: Flow[T]) { + def printEvent(marker: String): Flow[T] = + underlying.transform { + new Transformer[T, T] { + def onNext(element: T) = { + println(s"$marker: $element") + element :: Nil + } + override def onTermination(e: Option[Throwable]) = { + println(s"$marker: Terminated with error $e") + Nil + } + } + } + } + + private[http] def errorLogger(log: LoggingAdapter, msg: String): Transformer[ByteString, ByteString] = + new Transformer[ByteString, ByteString] { + def onNext(element: ByteString) = element :: Nil + override def onError(cause: Throwable): Unit = log.error(cause, msg) + } } diff --git a/akka-http-core/src/test/resources/reference.conf b/akka-http-core/src/test/resources/reference.conf new file mode 100644 index 0000000000..994e7c97f2 --- /dev/null +++ b/akka-http-core/src/test/resources/reference.conf @@ -0,0 +1,2 @@ +# override strange reference.conf setting in akka-stream test scope +akka.actor.default-mailbox.mailbox-type = akka.dispatch.UnboundedMailbox \ No newline at end of file diff --git a/akka-http-core/src/test/scala/akka/http/ClientServerSpec.scala b/akka-http-core/src/test/scala/akka/http/ClientServerSpec.scala new file mode 100644 index 0000000000..5fdf0d3b19 --- /dev/null +++ b/akka-http-core/src/test/scala/akka/http/ClientServerSpec.scala @@ -0,0 +1,172 @@ +/** + * Copyright (C) 2009-2014 Typesafe Inc. + */ + +package akka.http + +import java.net.Socket +import java.io.{ InputStreamReader, BufferedReader, OutputStreamWriter, BufferedWriter } +import com.typesafe.config.{ ConfigFactory, Config } +import scala.annotation.tailrec +import scala.concurrent.duration._ +import scala.concurrent.Await +import org.scalatest.{ BeforeAndAfterAll, Matchers, WordSpec } +import akka.actor.ActorSystem +import akka.testkit.TestProbe +import akka.io.IO +import akka.stream.{ FlowMaterializer, MaterializerSettings } +import akka.stream.testkit.{ ProducerProbe, ConsumerProbe, StreamTestKit } +import akka.stream.impl.SynchronousProducerFromIterable +import akka.stream.scaladsl.Flow +import akka.http.server.ServerSettings +import akka.http.client.ClientConnectionSettings +import akka.http.model._ +import akka.http.util._ +import headers._ +import HttpMethods._ +import HttpEntity._ +import TestUtils._ + +class ClientServerSpec extends WordSpec with Matchers with BeforeAndAfterAll { + val testConf: Config = ConfigFactory.parseString(""" + akka.event-handlers = ["akka.testkit.TestEventListener"] + akka.loglevel = WARNING""") + implicit val system = ActorSystem(getClass.getSimpleName, testConf) + import system.dispatcher + + val materializerSettings = MaterializerSettings(dispatcher = "akka.test.stream-dispatcher") + val materializer = FlowMaterializer(materializerSettings) + + "The server-side HTTP infrastructure" should { + + "properly bind and unbind a server" in { + val (hostname, port) = temporaryServerHostnameAndPort() + val commander = TestProbe() + commander.send(IO(Http), Http.Bind(hostname, port, materializerSettings = materializerSettings)) + + val Http.ServerBinding(localAddress, connectionStream) = commander.expectMsgType[Http.ServerBinding] + localAddress.getHostName shouldEqual hostname + localAddress.getPort shouldEqual port + + val c = StreamTestKit.consumerProbe[Http.IncomingConnection] + connectionStream.produceTo(c) + val sub = c.expectSubscription() + + sub.cancel() + // TODO: verify unbinding effect + } + + "properly complete a simple request/response cycle" in new TestSetup { + val (clientOut, clientIn) = openNewClientConnection[Symbol]() + val (serverIn, serverOut) = acceptConnection() + + val clientOutSub = clientOut.expectSubscription() + clientOutSub.sendNext(HttpRequest(uri = "/abc") -> 'abcContext) + + val serverInSub = serverIn.expectSubscription() + serverInSub.requestMore(1) + serverIn.expectNext().uri shouldEqual Uri(s"http://$hostname:$port/abc") + + val serverOutSub = serverOut.expectSubscription() + serverOutSub.sendNext(HttpResponse(entity = "yeah")) + + val clientInSub = clientIn.expectSubscription() + clientInSub.requestMore(1) + val (response, 'abcContext) = clientIn.expectNext() + toStrict(response.entity) shouldEqual HttpEntity("yeah") + } + + "properly complete a chunked request/response cycle" in new TestSetup { + val (clientOut, clientIn) = openNewClientConnection[Long]() + val (serverIn, serverOut) = acceptConnection() + + val chunks = List(Chunk("abc"), Chunk("defg"), Chunk("hijkl"), LastChunk) + val chunkedContentType: ContentType = MediaTypes.`application/base64` + val chunkedEntity = HttpEntity.Chunked(chunkedContentType, SynchronousProducerFromIterable(chunks)) + + val clientOutSub = clientOut.expectSubscription() + clientOutSub.sendNext(HttpRequest(POST, "/chunked", List(Accept(MediaRanges.`*/*`)), chunkedEntity) -> 12345678) + + val serverInSub = serverIn.expectSubscription() + serverInSub.requestMore(1) + private val HttpRequest(POST, uri, List(`User-Agent`(_), Host(_, _), Accept(Vector(MediaRanges.`*/*`))), + Chunked(`chunkedContentType`, chunkStream), HttpProtocols.`HTTP/1.1`) = serverIn.expectNext() + uri shouldEqual Uri(s"http://$hostname:$port/chunked") + Await.result(Flow(chunkStream).grouped(4).toFuture(materializer), 100.millis) shouldEqual chunks + + val serverOutSub = serverOut.expectSubscription() + serverOutSub.sendNext(HttpResponse(206, List(RawHeader("Age", "42")), chunkedEntity)) + + val clientInSub = clientIn.expectSubscription() + clientInSub.requestMore(1) + val (HttpResponse(StatusCodes.PartialContent, List(Date(_), Server(_), RawHeader("Age", "42")), + Chunked(`chunkedContentType`, chunkStream2), HttpProtocols.`HTTP/1.1`), 12345678) = clientIn.expectNext() + Await.result(Flow(chunkStream2).grouped(1000).toFuture(materializer), 100.millis) shouldEqual chunks + } + + } + + override def afterAll() = system.shutdown() + + class TestSetup { + val (hostname, port) = temporaryServerHostnameAndPort() + val bindHandler = TestProbe() + def configOverrides = "" + + // automatically bind a server + val connectionStream: ConsumerProbe[Http.IncomingConnection] = { + val commander = TestProbe() + val settings = configOverrides.toOption.map(ServerSettings.apply) + commander.send(IO(Http), Http.Bind(hostname, port, serverSettings = settings, materializerSettings = materializerSettings)) + val probe = StreamTestKit.consumerProbe[Http.IncomingConnection] + commander.expectMsgType[Http.ServerBinding].connectionStream.produceTo(probe) + probe + } + val connectionStreamSub = connectionStream.expectSubscription() + + def openNewClientConnection[T](settings: Option[ClientConnectionSettings] = None): (ProducerProbe[(HttpRequest, T)], ConsumerProbe[(HttpResponse, T)]) = { + val commander = TestProbe() + commander.send(IO(Http), Http.Connect(hostname, port, settings = settings, materializerSettings = materializerSettings)) + val connection = commander.expectMsgType[Http.OutgoingConnection] + connection.remoteAddress.getPort shouldEqual port + connection.remoteAddress.getHostName shouldEqual hostname + val requestProducerProbe = StreamTestKit.producerProbe[(HttpRequest, T)] + val responseConsumerProbe = StreamTestKit.consumerProbe[(HttpResponse, T)] + requestProducerProbe.produceTo(connection.processor[T]) + connection.processor[T].produceTo(responseConsumerProbe) + requestProducerProbe -> responseConsumerProbe + } + + def acceptConnection(): (ConsumerProbe[HttpRequest], ProducerProbe[HttpResponse]) = { + connectionStreamSub.requestMore(1) + val Http.IncomingConnection(_, requestProducer, responseConsumer) = connectionStream.expectNext() + val requestConsumerProbe = StreamTestKit.consumerProbe[HttpRequest] + val responseProducerProbe = StreamTestKit.producerProbe[HttpResponse] + requestProducer.produceTo(requestConsumerProbe) + responseProducerProbe.produceTo(responseConsumer) + requestConsumerProbe -> responseProducerProbe + } + + def openClientSocket() = new Socket(hostname, port) + + def write(socket: Socket, data: String) = { + val writer = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream)) + writer.write(data) + writer.flush() + writer + } + + def readAll(socket: Socket)(reader: BufferedReader = new BufferedReader(new InputStreamReader(socket.getInputStream))): (String, BufferedReader) = { + val sb = new java.lang.StringBuilder + val cbuf = new Array[Char](256) + @tailrec def drain(): (String, BufferedReader) = reader.read(cbuf) match { + case -1 ⇒ sb.toString -> reader + case n ⇒ sb.append(cbuf, 0, n); drain() + } + drain() + } + } + + def toStrict(entity: HttpEntity): HttpEntity.Strict = + Await.result(entity.toStrict(500.millis, materializer), 1.second) +} \ No newline at end of file diff --git a/akka-http-core/src/test/scala/akka/http/TestClient.scala b/akka-http-core/src/test/scala/akka/http/TestClient.scala new file mode 100644 index 0000000000..a1f7064673 --- /dev/null +++ b/akka-http-core/src/test/scala/akka/http/TestClient.scala @@ -0,0 +1,49 @@ +/** + * Copyright (C) 2009-2014 Typesafe Inc. + */ + +package akka.http + +import com.typesafe.config.{ ConfigFactory, Config } +import scala.concurrent.Future +import scala.util.{ Failure, Success } +import scala.concurrent.duration._ +import akka.util.Timeout +import akka.stream.{ MaterializerSettings, FlowMaterializer } +import akka.stream.scaladsl.Flow +import akka.io.IO +import akka.actor.ActorSystem +import akka.pattern.ask +import akka.http.model._ +import HttpMethods._ + +object TestClient extends App { + val testConf: Config = ConfigFactory.parseString(""" + akka.loglevel = INFO + akka.log-dead-letters = off + """) + implicit val system = ActorSystem("ServerTest", testConf) + import system.dispatcher + + val materializer = FlowMaterializer(MaterializerSettings()) + implicit val askTimeout: Timeout = 500.millis + val host = "spray.io" + + println(s"Fetching HTTP server version of host `$host` ...") + + val result = for { + connection ← IO(Http).ask(Http.Connect(host)).mapTo[Http.OutgoingConnection] + response ← sendRequest(HttpRequest(GET, uri = "/"), connection) + } yield response.header[headers.Server] + + def sendRequest(request: HttpRequest, connection: Http.OutgoingConnection): Future[HttpResponse] = { + Flow(List(HttpRequest() -> 'NoContext)).produceTo(materializer, connection.processor) + Flow(connection.processor).map(_._1).toFuture(materializer) + } + + result onComplete { + case Success(res) ⇒ println(s"$host is running ${res mkString ", "}") + case Failure(error) ⇒ println(s"Error: $error") + } + result onComplete { _ ⇒ system.shutdown() } +} \ No newline at end of file diff --git a/akka-http-core/src/test/scala/akka/http/TestUtils.scala b/akka-http-core/src/test/scala/akka/http/TestUtils.scala new file mode 100644 index 0000000000..d6f1265be2 --- /dev/null +++ b/akka-http-core/src/test/scala/akka/http/TestUtils.scala @@ -0,0 +1,24 @@ +/** + * Copyright (C) 2009-2014 Typesafe Inc. + */ + +package akka.http + +import java.net.InetSocketAddress +import java.nio.channels.ServerSocketChannel + +object TestUtils { + def temporaryServerAddress(interface: String = "127.0.0.1"): InetSocketAddress = { + val serverSocket = ServerSocketChannel.open() + try { + serverSocket.socket.bind(new InetSocketAddress(interface, 0)) + val port = serverSocket.socket.getLocalPort + new InetSocketAddress(interface, port) + } finally serverSocket.close() + } + + def temporaryServerHostnameAndPort(interface: String = "127.0.0.1"): (String, Int) = { + val socketAddress = temporaryServerAddress(interface) + socketAddress.getHostName -> socketAddress.getPort + } +} diff --git a/akka-http-core/src/test/scala/akka/http/parsing/RequestParserSpec.scala b/akka-http-core/src/test/scala/akka/http/parsing/RequestParserSpec.scala index 9f3bb33ad9..0af73b257f 100644 --- a/akka-http-core/src/test/scala/akka/http/parsing/RequestParserSpec.scala +++ b/akka-http-core/src/test/scala/akka/http/parsing/RequestParserSpec.scala @@ -75,8 +75,8 @@ class RequestParserSpec extends FreeSpec with Matchers with BeforeAndAfterAll { |Content-length: 17 | |Shake your BOODY!""" should parseTo { - HttpRequest(POST, "/resource/yes", List(`Content-Length`(17), `Content-Type`(ContentTypes.`text/plain(UTF-8)`), - Connection("keep-alive"), `User-Agent`("curl/7.19.7 xyz")), "Shake your BOODY!", `HTTP/1.0`) + HttpRequest(POST, "/resource/yes", List(Connection("keep-alive"), `User-Agent`("curl/7.19.7 xyz")), + "Shake your BOODY!", `HTTP/1.0`) } closeAfterResponseCompletion shouldEqual Seq(false) } @@ -90,9 +90,8 @@ class RequestParserSpec extends FreeSpec with Matchers with BeforeAndAfterAll { |Shake your BOODY!GET / HTTP/1.0 | |""" should parseTo( - HttpRequest(POST, "/resource/yes", List(`Content-Length`(17), Connection("keep-alive"), - `User-Agent`("curl/7.19.7 xyz")), HttpEntity(ContentTypes.`application/octet-stream`, "Shake your BOODY!"), - `HTTP/1.0`), + HttpRequest(POST, "/resource/yes", List(Connection("keep-alive"), `User-Agent`("curl/7.19.7 xyz")), + "Shake your BOODY!".getBytes, `HTTP/1.0`), HttpRequest(protocol = `HTTP/1.0`)) closeAfterResponseCompletion shouldEqual Seq(false, true) } @@ -121,8 +120,7 @@ class RequestParserSpec extends FreeSpec with Matchers with BeforeAndAfterAll { | |ABCDPATCH""" }.toCharArray.map(_.toString).toSeq should rawMultiParseTo( - HttpRequest(PUT, "/resource/yes", List(Host("x"), `Content-Length`(4)), - HttpEntity(ContentTypes.`application/octet-stream`, "ABCD"))) + HttpRequest(PUT, "/resource/yes", List(Host("x")), "ABCD".getBytes)) closeAfterResponseCompletion shouldEqual Seq(false) } @@ -140,10 +138,7 @@ class RequestParserSpec extends FreeSpec with Matchers with BeforeAndAfterAll { |Content-Type: application/pdf |Content-Length: 0 | - |""" should parseTo { - HttpRequest(GET, "/data", List(`Content-Length`(0), `Content-Type`(`application/pdf`), Host("x")), - HttpEntity(`application/pdf`, "")) - } + |""" should parseTo(HttpRequest(GET, "/data", List(Host("x")), HttpEntity(`application/pdf`, ""))) closeAfterResponseCompletion shouldEqual Seq(false) } @@ -156,8 +151,7 @@ class RequestParserSpec extends FreeSpec with Matchers with BeforeAndAfterAll { |Host: ping | |""" - val baseRequest = HttpRequest(PATCH, "/data", List(Host("ping"), `Content-Type`(`application/pdf`), - Connection("lalelu"), `Transfer-Encoding`(TransferEncodings.chunked))) + val baseRequest = HttpRequest(PATCH, "/data", List(Host("ping"), Connection("lalelu"))) "request start" in new Test { Seq(start, "rest") should generalMultiParseTo( @@ -219,8 +213,8 @@ class RequestParserSpec extends FreeSpec with Matchers with BeforeAndAfterAll { |Host: ping | |""" - val baseRequest = HttpRequest(PATCH, "/data", List(Host("ping"), Connection("lalelu"), - `Transfer-Encoding`(TransferEncodings.chunked)), HttpEntity.Chunked(`application/octet-stream`, producer())) + val baseRequest = HttpRequest(PATCH, "/data", List(Host("ping"), Connection("lalelu")), + HttpEntity.Chunked(`application/octet-stream`, producer())) "an illegal char after chunk size" in new Test { Seq(start, @@ -331,20 +325,18 @@ class RequestParserSpec extends FreeSpec with Matchers with BeforeAndAfterAll { override def afterAll() = system.shutdown() - private[http] class Test { + private class Test { var closeAfterResponseCompletion = Seq.empty[Boolean] def parseTo(expected: HttpRequest*): Matcher[String] = multiParseTo(expected: _*).compose(_ :: Nil) def multiParseTo(expected: HttpRequest*): Matcher[Seq[String]] = multiParseTo(newParser, expected: _*) - def multiParseTo(parser: HttpRequestParser, expected: HttpRequest*): Matcher[Seq[String]] = rawMultiParseTo(parser, expected: _*).compose(_ map prep) def rawMultiParseTo(expected: HttpRequest*): Matcher[Seq[String]] = rawMultiParseTo(newParser, expected: _*) - def rawMultiParseTo(parser: HttpRequestParser, expected: HttpRequest*): Matcher[Seq[String]] = generalRawMultiParseTo(parser, expected.map(Right(_)): _*) @@ -356,11 +348,9 @@ class RequestParserSpec extends FreeSpec with Matchers with BeforeAndAfterAll { def generalRawMultiParseTo(expected: Either[ParseError, HttpRequest]*): Matcher[Seq[String]] = generalRawMultiParseTo(newParser, expected: _*) - def generalRawMultiParseTo(parser: HttpRequestParser, expected: Either[ParseError, HttpRequest]*): Matcher[Seq[String]] = equal(expected).matcher[Seq[Either[ParseError, HttpRequest]]] compose { input: Seq[String] ⇒ - import HttpServerPipeline._ val future = Flow(input.toList) .map(ByteString.apply) @@ -368,9 +358,9 @@ class RequestParserSpec extends FreeSpec with Matchers with BeforeAndAfterAll { .splitWhen(_.isInstanceOf[ParserOutput.MessageStart]) .headAndTail(materializer) .collect { - case (x: ParserOutput.RequestStart, entityParts) ⇒ - closeAfterResponseCompletion :+= x.closeAfterResponseCompletion - Right(HttpServerPipeline.constructRequest(x, entityParts)) + case (ParserOutput.RequestStart(method, uri, protocol, headers, createEntity, close), entityParts) ⇒ + closeAfterResponseCompletion :+= close + Right(HttpRequest(method, uri, headers, createEntity(entityParts), protocol)) case (x: ParseError, _) ⇒ Left(x) } .map { x ⇒ diff --git a/akka-http-core/src/test/scala/akka/http/parsing/ResponseParserSpec.scala b/akka-http-core/src/test/scala/akka/http/parsing/ResponseParserSpec.scala new file mode 100644 index 0000000000..c62569928d --- /dev/null +++ b/akka-http-core/src/test/scala/akka/http/parsing/ResponseParserSpec.scala @@ -0,0 +1,260 @@ +/** + * Copyright (C) 2009-2014 Typesafe Inc. + */ + +package akka.http.parsing + +import com.typesafe.config.{ ConfigFactory, Config } +import scala.concurrent.{ Future, Await } +import scala.concurrent.duration._ +import org.scalatest.{ Tag, BeforeAndAfterAll, FreeSpec, Matchers } +import org.scalatest.matchers.Matcher +import org.reactivestreams.api.Producer +import akka.stream.scaladsl.Flow +import akka.stream.impl.SynchronousProducerFromIterable +import akka.stream.{ FlattenStrategy, MaterializerSettings, FlowMaterializer } +import akka.util.ByteString +import akka.actor.ActorSystem +import akka.http.client.HttpClientPipeline +import akka.http.util._ +import akka.http.model._ +import headers._ +import MediaTypes._ +import HttpMethods._ +import HttpProtocols._ +import StatusCodes._ +import HttpEntity._ +import ParserOutput.ParseError + +class ResponseParserSpec extends FreeSpec with Matchers with BeforeAndAfterAll { + val testConf: Config = ConfigFactory.parseString(""" + akka.event-handlers = ["akka.testkit.TestEventListener"] + akka.loglevel = WARNING + akka.http.parsing.max-response-reason-length = 21""") + implicit val system = ActorSystem(getClass.getSimpleName, testConf) + import system.dispatcher + + val materializer = FlowMaterializer(MaterializerSettings()) + val ServerOnTheMove = StatusCodes.registerCustom(331, "Server on the move") + + "The response parsing logic should" - { + "properly parse" - { + + // http://tools.ietf.org/html/rfc7230#section-3.3.3 + "a 200 response to a HEAD request" in new Test { + """HTTP/1.1 200 OK + | + |HTT""" should parseTo(HEAD, HttpResponse()) + closeAfterResponseCompletion shouldEqual Seq(false) + } + + // http://tools.ietf.org/html/rfc7230#section-3.3.3 + "a 204 response" in new Test { + """HTTP/1.1 204 OK + | + |""" should parseTo(HttpResponse(NoContent)) + closeAfterResponseCompletion shouldEqual Seq(false) + } + + "a response with a custom status code" in new Test { + """HTTP/1.1 331 Server on the move + |Content-Length: 0 + | + |""" should parseTo(HttpResponse(ServerOnTheMove)) + closeAfterResponseCompletion shouldEqual Seq(false) + } + + "a response with one header, a body, but no Content-Length header" in new Test { + """HTTP/1.0 404 Not Found + |Host: api.example.com + | + |Foobs""" should parseTo(HttpResponse(NotFound, List(Host("api.example.com")), "Foobs".getBytes, `HTTP/1.0`)) + closeAfterResponseCompletion shouldEqual Seq(true) + } + + "a response with one header, no body, and no Content-Length header" in new Test { + """HTTP/1.0 404 Not Found + |Host: api.example.com + | + |""" should parseTo(HttpResponse(NotFound, List(Host("api.example.com")), + HttpEntity.empty(ContentTypes.`application/octet-stream`), `HTTP/1.0`)) + closeAfterResponseCompletion shouldEqual Seq(true) + } + + "a response with 3 headers, a body and remaining content" in new Test { + """HTTP/1.1 500 Internal Server Error + |User-Agent: curl/7.19.7 xyz + |Connection:close + |Content-Length: 17 + |Content-Type: text/plain; charset=UTF-8 + | + |Shake your BOODY!HTTP/1.""" should parseTo(HttpResponse(InternalServerError, List(Connection("close"), + `User-Agent`("curl/7.19.7 xyz")), "Shake your BOODY!")) + closeAfterResponseCompletion shouldEqual Seq(true) + } + + "a split response (parsed byte-by-byte)" in new Test { + prep { + """HTTP/1.1 200 Ok + |Content-Length: 4 + | + |ABCD""" + }.toCharArray.map(_.toString).toSeq should rawMultiParseTo(HttpResponse(entity = "ABCD".getBytes)) + closeAfterResponseCompletion shouldEqual Seq(false) + } + } + + "properly parse a chunked" - { + val start = + """HTTP/1.1 200 OK + |Transfer-Encoding: chunked + |Connection: lalelu + |Content-Type: application/pdf + |Server: spray-can + | + |""" + val baseResponse = HttpResponse(headers = List(Server("spray-can"), Connection("lalelu"))) + + "response start" in new Test { + Seq(start, "rest") should generalMultiParseTo( + Right(baseResponse.withEntity(HttpEntity.Chunked(`application/pdf`, producer()))), + Left("Illegal character 'r' in chunk start")) + closeAfterResponseCompletion shouldEqual Seq(false) + } + + "message chunk with and without extension" in new Test { + Seq(start + + """3 + |abc + |10;some=stuff;bla + |0123456789ABCDEF + |""", + "10;foo=", + """bar + |0123456789ABCDEF + |10 + |0123456789""", + """ABCDEF + |dead""") should generalMultiParseTo( + Right(baseResponse.withEntity(HttpEntity.Chunked(`application/pdf`, producer( + HttpEntity.Chunk(ByteString("abc")), + HttpEntity.Chunk(ByteString("0123456789ABCDEF"), "some=stuff;bla"), + HttpEntity.Chunk(ByteString("0123456789ABCDEF"), "foo=bar"), + HttpEntity.Chunk(ByteString("0123456789ABCDEF"), "")))))) + closeAfterResponseCompletion shouldEqual Seq(false) + } + + "message end" in new Test { + Seq(start, + """0 + | + |""") should generalMultiParseTo( + Right(baseResponse.withEntity(HttpEntity.Chunked(`application/pdf`, producer(HttpEntity.LastChunk))))) + closeAfterResponseCompletion shouldEqual Seq(false) + } + + "message end with extension, trailer and remaining content" in new Test { + Seq(start, + """000;nice=true + |Foo: pip + | apo + |Bar: xyz + | + |HT""") should generalMultiParseTo( + Right(baseResponse.withEntity(HttpEntity.Chunked(`application/pdf`, + producer(HttpEntity.LastChunk("nice=true", List(RawHeader("Bar", "xyz"), RawHeader("Foo", "pip apo")))))))) + closeAfterResponseCompletion shouldEqual Seq(false) + } + } + + "reject a response with" - { + "HTTP version 1.2" in new Test { + Seq("HTTP/1.2 200 OK\r\n") should generalMultiParseTo(Left("The server-side HTTP version is not supported")) + } + + "an illegal status code" in new Test { + Seq("HTTP/1", ".1 2000 Something") should generalMultiParseTo(Left("Illegal response status code")) + } + + "a too-long response status reason" in new Test { + Seq("HTTP/1.1 204 12345678", "90123456789012\r\n") should generalMultiParseTo( + Left("Response reason phrase exceeds the configured limit of 21 characters")) + } + } + } + + override def afterAll() = system.shutdown() + + private class Test { + var closeAfterResponseCompletion = Seq.empty[Boolean] + + def parseTo(expected: HttpResponse*): Matcher[String] = parseTo(GET, expected: _*) + def parseTo(requestMethod: HttpMethod, expected: HttpResponse*): Matcher[String] = + multiParseTo(requestMethod, expected: _*).compose(_ :: Nil) + + def multiParseTo(expected: HttpResponse*): Matcher[Seq[String]] = multiParseTo(GET, expected: _*) + def multiParseTo(requestMethod: HttpMethod, expected: HttpResponse*): Matcher[Seq[String]] = + rawMultiParseTo(requestMethod, expected: _*).compose(_ map prep) + + def rawMultiParseTo(expected: HttpResponse*): Matcher[Seq[String]] = rawMultiParseTo(GET, expected: _*) + def rawMultiParseTo(requestMethod: HttpMethod, expected: HttpResponse*): Matcher[Seq[String]] = + generalRawMultiParseTo(requestMethod, expected.map(Right(_)): _*) + + def parseToError(error: String): Matcher[String] = generalMultiParseTo(Left(error)).compose(_ :: Nil) + + def generalMultiParseTo(expected: Either[String, HttpResponse]*): Matcher[Seq[String]] = + generalRawMultiParseTo(expected: _*).compose(_ map prep) + + def generalRawMultiParseTo(expected: Either[String, HttpResponse]*): Matcher[Seq[String]] = + generalRawMultiParseTo(GET, expected: _*) + def generalRawMultiParseTo(requestMethod: HttpMethod, expected: Either[String, HttpResponse]*): Matcher[Seq[String]] = + equal(expected).matcher[Seq[Either[String, HttpResponse]]] compose { + input: Seq[String] ⇒ + val future = + Flow(input.toList) + .map(ByteString.apply) + .transform(newParser(requestMethod)) + .splitWhen(_.isInstanceOf[ParserOutput.MessageStart]) + .headAndTail(materializer) + .collect { + case (ParserOutput.ResponseStart(statusCode, protocol, headers, createEntity, close), entityParts) ⇒ + closeAfterResponseCompletion :+= close + Right(HttpResponse(statusCode, headers, createEntity(entityParts), protocol)) + case (x: ParseError, _) ⇒ Left(x) + }.map { x ⇒ + Flow { + x match { + case Right(response) ⇒ compactEntity(response.entity).map(x ⇒ Right(response.withEntity(x))) + case Left(error) ⇒ Future.successful(Left(error.info.formatPretty)) + } + }.toProducer(materializer) + } + .flatten(FlattenStrategy.concat) + .grouped(1000).toFuture(materializer) + Await.result(future, 250.millis) + } + + def newParser(requestMethod: HttpMethod = GET) = { + val parser = new HttpResponseParser(ParserSettings(system), materializer, + dequeueRequestMethodForNextResponse = () ⇒ requestMethod)() + parser + } + + private def compactEntity(entity: HttpEntity): Future[HttpEntity] = + entity match { + case x: HttpEntity.Chunked ⇒ compactEntityChunks(x.chunks).map(compacted ⇒ x.copy(chunks = compacted)) + case _ ⇒ entity.toStrict(250.millis, materializer) + } + + private def compactEntityChunks(data: Producer[ChunkStreamPart]): Future[Producer[ChunkStreamPart]] = + Flow(data).grouped(1000).toFuture(materializer) + .map(producer(_: _*)) + .recover { + case _: NoSuchElementException ⇒ producer[ChunkStreamPart]() + } + + def prep(response: String) = response.stripMarginWithNewline("\r\n") + + def producer[T](elems: T*): Producer[T] = SynchronousProducerFromIterable(elems.toList) + } +} diff --git a/akka-http-core/src/test/scala/akka/http/rendering/RequestRendererSpec.scala b/akka-http-core/src/test/scala/akka/http/rendering/RequestRendererSpec.scala new file mode 100644 index 0000000000..c4a073ee9a --- /dev/null +++ b/akka-http-core/src/test/scala/akka/http/rendering/RequestRendererSpec.scala @@ -0,0 +1,199 @@ +/** + * Copyright (C) 2009-2014 Typesafe Inc. + */ + +package akka.http.rendering + +import com.typesafe.config.{ Config, ConfigFactory } +import java.net.InetSocketAddress +import scala.concurrent.duration._ +import scala.concurrent.Await +import org.scalatest.{ FreeSpec, Matchers, BeforeAndAfterAll } +import org.scalatest.matchers.Matcher +import akka.actor.ActorSystem +import akka.event.NoLogging +import akka.http.model._ +import akka.http.model.headers._ +import akka.http.util._ +import akka.stream.scaladsl.Flow +import akka.stream.{ MaterializerSettings, FlowMaterializer } +import akka.stream.impl.SynchronousProducerFromIterable +import HttpEntity._ +import HttpMethods._ + +class RequestRendererSpec extends FreeSpec with Matchers with BeforeAndAfterAll { + val testConf: Config = ConfigFactory.parseString(""" + akka.event-handlers = ["akka.testkit.TestEventListener"] + akka.loglevel = WARNING""") + implicit val system = ActorSystem(getClass.getSimpleName, testConf) + import system.dispatcher + + val materializer = FlowMaterializer(MaterializerSettings()) + + "The request preparation logic should" - { + "properly render an unchunked" - { + + "GET request without headers and without body" in new TestSetup() { + HttpRequest(GET, "/abc") should renderTo { + """GET /abc HTTP/1.1 + |Host: test.com:8080 + |User-Agent: spray-can/1.0.0 + | + |""" + } + } + + "GET request with a URI that requires encoding" in new TestSetup() { + HttpRequest(GET, "/abc close).matcher[(String, Boolean)] compose { input ⇒ + equal(expected.stripMarginWithNewline("\r\n") -> close).matcher[(String, Boolean)] compose { ctx ⇒ val renderer = newRenderer - val byteStringProducer :: Nil = renderer.onNext(input) + val byteStringProducer :: Nil = renderer.onNext(ctx) val future = Flow(byteStringProducer).grouped(1000).toFuture(materializer).map(_.reduceLeft(_ ++ _).utf8String) Await.result(future, 250.millis) -> renderer.isComplete }