!htc #16641 support remote-address-header setting
This commit is contained in:
parent
967c0ed2be
commit
62c48aadfd
5 changed files with 40 additions and 10 deletions
|
|
@ -4,6 +4,8 @@
|
|||
|
||||
package akka.http.impl.engine.server
|
||||
|
||||
import java.net.InetSocketAddress
|
||||
|
||||
import akka.http.ServerSettings
|
||||
import akka.stream.io._
|
||||
import org.reactivestreams.{ Subscriber, Publisher }
|
||||
|
|
@ -32,7 +34,7 @@ private[http] object HttpServerBluePrint {
|
|||
|
||||
type ServerShape = BidiShape[HttpResponse, SslTlsOutbound, SslTlsInbound, HttpRequest]
|
||||
|
||||
def apply(settings: ServerSettings, log: LoggingAdapter)(implicit mat: FlowMaterializer): Graph[ServerShape, Unit] = {
|
||||
def apply(settings: ServerSettings, remoteAddress: Option[InetSocketAddress], log: LoggingAdapter)(implicit mat: FlowMaterializer): Graph[ServerShape, Unit] = {
|
||||
import settings._
|
||||
|
||||
// the initial header parser we initially use for every connection,
|
||||
|
|
@ -65,10 +67,15 @@ private[http] object HttpServerBluePrint {
|
|||
.splitWhen(x ⇒ x.isInstanceOf[MessageStart] || x == MessageEnd)
|
||||
.via(headAndTailFlow)
|
||||
.map {
|
||||
case (RequestStart(method, uri, protocol, headers, createEntity, _, _), entityParts) ⇒
|
||||
val effectiveUri = HttpRequest.effectiveUri(uri, headers, securedConnection = false, defaultHostHeader)
|
||||
case (RequestStart(method, uri, protocol, hdrs, createEntity, _, _), entityParts) ⇒
|
||||
val effectiveUri = HttpRequest.effectiveUri(uri, hdrs, securedConnection = false, defaultHostHeader)
|
||||
val effectiveMethod = if (method == HttpMethods.HEAD && transparentHeadRequests) HttpMethods.GET else method
|
||||
HttpRequest(effectiveMethod, effectiveUri, headers, createEntity(entityParts), protocol)
|
||||
val effectiveHeaders =
|
||||
if (settings.remoteAddressHeader && remoteAddress.isDefined)
|
||||
headers.`Remote-Address`(RemoteAddress(remoteAddress.get.getAddress)) +: hdrs
|
||||
else hdrs
|
||||
|
||||
HttpRequest(effectiveMethod, effectiveUri, effectiveHeaders, createEntity(entityParts), protocol)
|
||||
case (_, src) ⇒ src.runWith(Sink.ignore)
|
||||
}.collect {
|
||||
case r: HttpRequest ⇒ r
|
||||
|
|
|
|||
|
|
@ -61,7 +61,7 @@ class HttpExt(config: Config)(implicit system: ActorSystem) extends akka.actor.E
|
|||
Tcp().bind(interface, effectivePort, settings.backlog, settings.socketOptions, settings.timeouts.idleTimeout)
|
||||
connections.map {
|
||||
case Tcp.IncomingConnection(localAddress, remoteAddress, flow) ⇒
|
||||
val layer = serverLayer(settings, log)
|
||||
val layer = serverLayer(settings, Some(remoteAddress), log)
|
||||
IncomingConnection(localAddress, remoteAddress, layer atop tlsStage join flow)
|
||||
}.mapMaterializedValue {
|
||||
_.map(tcpBinding ⇒ ServerBinding(tcpBinding.localAddress)(() ⇒ tcpBinding.unbind()))(fm.executionContext)
|
||||
|
|
@ -155,8 +155,9 @@ class HttpExt(config: Config)(implicit system: ActorSystem) extends akka.actor.E
|
|||
* can only be materialized once.
|
||||
*/
|
||||
def serverLayer(settings: ServerSettings,
|
||||
remoteAddress: Option[InetSocketAddress] = None,
|
||||
log: LoggingAdapter = system.log)(implicit mat: FlowMaterializer): ServerLayer =
|
||||
BidiFlow.wrap(HttpServerBluePrint(settings, log))
|
||||
BidiFlow.wrap(HttpServerBluePrint(settings, remoteAddress, log))
|
||||
|
||||
/**
|
||||
* Creates a [[Flow]] representing a prospective HTTP client connection to the given endpoint.
|
||||
|
|
|
|||
|
|
@ -280,7 +280,7 @@ class ConnectionPoolSpec extends AkkaSpec("""
|
|||
// TODO getHostString in Java7
|
||||
Tcp().bind(serverEndpoint.getHostName, serverEndpoint.getPort, idleTimeout = serverSettings.timeouts.idleTimeout)
|
||||
.map { c ⇒
|
||||
val layer = Http().serverLayer(serverSettings, log)
|
||||
val layer = Http().serverLayer(serverSettings, log = log)
|
||||
Http.IncomingConnection(c.localAddress, c.remoteAddress, layer atop rawBytesInjection join c.flow)
|
||||
}.runWith(sink)
|
||||
if (autoAccept) null else incomingConnections.expectSubscription()
|
||||
|
|
|
|||
|
|
@ -4,7 +4,8 @@
|
|||
|
||||
package akka.http.impl.engine.server
|
||||
|
||||
import akka.actor.ActorSystem
|
||||
import java.net.{ InetAddress, InetSocketAddress }
|
||||
|
||||
import akka.http.ServerSettings
|
||||
|
||||
import scala.util.Random
|
||||
|
|
@ -13,7 +14,7 @@ import scala.concurrent.duration._
|
|||
import org.scalatest.Inside
|
||||
import akka.util.ByteString
|
||||
import akka.stream.scaladsl._
|
||||
import akka.stream.{ FlowMaterializer, ActorFlowMaterializer }
|
||||
import akka.stream.ActorFlowMaterializer
|
||||
import akka.stream.testkit._
|
||||
import akka.http.scaladsl.model._
|
||||
import akka.http.impl.util._
|
||||
|
|
@ -659,6 +660,24 @@ class HttpServerSpec extends AkkaSpec("akka.loggers = []\n akka.loglevel = OFF")
|
|||
|
||||
expectRequest shouldEqual HttpRequest(uri = "http://example.com//foo", headers = List(Host("example.com")))
|
||||
}
|
||||
|
||||
"support remote-address-header" in new TestSetup {
|
||||
lazy val theAddress = InetAddress.getByName("127.5.2.1")
|
||||
|
||||
override def remoteAddress: Option[InetSocketAddress] =
|
||||
Some(new InetSocketAddress(theAddress, 8080))
|
||||
|
||||
override def settings: ServerSettings =
|
||||
super.settings.copy(remoteAddressHeader = true)
|
||||
|
||||
send("""GET / HTTP/1.1
|
||||
|Host: example.com
|
||||
|
|
||||
|""".stripMarginWithNewline("\r\n"))
|
||||
|
||||
val request = expectRequest
|
||||
request.headers should contain(`Remote-Address`(RemoteAddress(theAddress)))
|
||||
}
|
||||
}
|
||||
class TestSetup extends HttpServerTestSetupBase {
|
||||
implicit def system = spec.system
|
||||
|
|
|
|||
|
|
@ -4,6 +4,8 @@
|
|||
|
||||
package akka.http.impl.engine.server
|
||||
|
||||
import java.net.InetSocketAddress
|
||||
|
||||
import akka.stream.io.{ SendBytes, SslTlsOutbound, SessionBytes }
|
||||
|
||||
import scala.concurrent.duration.FiniteDuration
|
||||
|
|
@ -30,12 +32,13 @@ abstract class HttpServerTestSetupBase {
|
|||
val responses = TestPublisher.manualProbe[HttpResponse]
|
||||
|
||||
def settings = ServerSettings(system).copy(serverHeader = Some(Server(List(ProductVersion("akka-http", "test")))))
|
||||
def remoteAddress: Option[InetSocketAddress] = None
|
||||
|
||||
val (netIn, netOut) = {
|
||||
val netIn = TestPublisher.manualProbe[ByteString]
|
||||
val netOut = TestSubscriber.manualProbe[ByteString]
|
||||
|
||||
FlowGraph.closed(HttpServerBluePrint(settings, NoLogging)) { implicit b ⇒
|
||||
FlowGraph.closed(HttpServerBluePrint(settings, remoteAddress = remoteAddress, log = NoLogging)) { implicit b ⇒
|
||||
server ⇒
|
||||
import FlowGraph.Implicits._
|
||||
Source(netIn) ~> Flow[ByteString].map(SessionBytes(null, _)) ~> server.in2
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue