From 62c48aadfd8deb3d7ac484571b25dd9156b44919 Mon Sep 17 00:00:00 2001 From: Johannes Rudolph Date: Fri, 5 Jun 2015 14:03:15 +0200 Subject: [PATCH] !htc #16641 support remote-address-header setting --- .../engine/server/HttpServerBluePrint.scala | 15 ++++++++---- .../main/scala/akka/http/scaladsl/Http.scala | 5 ++-- .../engine/client/ConnectionPoolSpec.scala | 2 +- .../impl/engine/server/HttpServerSpec.scala | 23 +++++++++++++++++-- .../server/HttpServerTestSetupBase.scala | 5 +++- 5 files changed, 40 insertions(+), 10 deletions(-) diff --git a/akka-http-core/src/main/scala/akka/http/impl/engine/server/HttpServerBluePrint.scala b/akka-http-core/src/main/scala/akka/http/impl/engine/server/HttpServerBluePrint.scala index 8f39a991e8..fcc2baa5d7 100644 --- a/akka-http-core/src/main/scala/akka/http/impl/engine/server/HttpServerBluePrint.scala +++ b/akka-http-core/src/main/scala/akka/http/impl/engine/server/HttpServerBluePrint.scala @@ -4,6 +4,8 @@ package akka.http.impl.engine.server +import java.net.InetSocketAddress + import akka.http.ServerSettings import akka.stream.io._ import org.reactivestreams.{ Subscriber, Publisher } @@ -32,7 +34,7 @@ private[http] object HttpServerBluePrint { type ServerShape = BidiShape[HttpResponse, SslTlsOutbound, SslTlsInbound, HttpRequest] - def apply(settings: ServerSettings, log: LoggingAdapter)(implicit mat: FlowMaterializer): Graph[ServerShape, Unit] = { + def apply(settings: ServerSettings, remoteAddress: Option[InetSocketAddress], log: LoggingAdapter)(implicit mat: FlowMaterializer): Graph[ServerShape, Unit] = { import settings._ // the initial header parser we initially use for every connection, @@ -65,10 +67,15 @@ private[http] object HttpServerBluePrint { .splitWhen(x ⇒ x.isInstanceOf[MessageStart] || x == MessageEnd) .via(headAndTailFlow) .map { - case (RequestStart(method, uri, protocol, headers, createEntity, _, _), entityParts) ⇒ - val effectiveUri = HttpRequest.effectiveUri(uri, headers, securedConnection = false, defaultHostHeader) + case (RequestStart(method, uri, protocol, hdrs, createEntity, _, _), entityParts) ⇒ + val effectiveUri = HttpRequest.effectiveUri(uri, hdrs, securedConnection = false, defaultHostHeader) val effectiveMethod = if (method == HttpMethods.HEAD && transparentHeadRequests) HttpMethods.GET else method - HttpRequest(effectiveMethod, effectiveUri, headers, createEntity(entityParts), protocol) + val effectiveHeaders = + if (settings.remoteAddressHeader && remoteAddress.isDefined) + headers.`Remote-Address`(RemoteAddress(remoteAddress.get.getAddress)) +: hdrs + else hdrs + + HttpRequest(effectiveMethod, effectiveUri, effectiveHeaders, createEntity(entityParts), protocol) case (_, src) ⇒ src.runWith(Sink.ignore) }.collect { case r: HttpRequest ⇒ r diff --git a/akka-http-core/src/main/scala/akka/http/scaladsl/Http.scala b/akka-http-core/src/main/scala/akka/http/scaladsl/Http.scala index 40b6b86de5..1643d0c804 100644 --- a/akka-http-core/src/main/scala/akka/http/scaladsl/Http.scala +++ b/akka-http-core/src/main/scala/akka/http/scaladsl/Http.scala @@ -61,7 +61,7 @@ class HttpExt(config: Config)(implicit system: ActorSystem) extends akka.actor.E Tcp().bind(interface, effectivePort, settings.backlog, settings.socketOptions, settings.timeouts.idleTimeout) connections.map { case Tcp.IncomingConnection(localAddress, remoteAddress, flow) ⇒ - val layer = serverLayer(settings, log) + val layer = serverLayer(settings, Some(remoteAddress), log) IncomingConnection(localAddress, remoteAddress, layer atop tlsStage join flow) }.mapMaterializedValue { _.map(tcpBinding ⇒ ServerBinding(tcpBinding.localAddress)(() ⇒ tcpBinding.unbind()))(fm.executionContext) @@ -155,8 +155,9 @@ class HttpExt(config: Config)(implicit system: ActorSystem) extends akka.actor.E * can only be materialized once. */ def serverLayer(settings: ServerSettings, + remoteAddress: Option[InetSocketAddress] = None, log: LoggingAdapter = system.log)(implicit mat: FlowMaterializer): ServerLayer = - BidiFlow.wrap(HttpServerBluePrint(settings, log)) + BidiFlow.wrap(HttpServerBluePrint(settings, remoteAddress, log)) /** * Creates a [[Flow]] representing a prospective HTTP client connection to the given endpoint. diff --git a/akka-http-core/src/test/scala/akka/http/impl/engine/client/ConnectionPoolSpec.scala b/akka-http-core/src/test/scala/akka/http/impl/engine/client/ConnectionPoolSpec.scala index 98d26007f3..dbead72306 100644 --- a/akka-http-core/src/test/scala/akka/http/impl/engine/client/ConnectionPoolSpec.scala +++ b/akka-http-core/src/test/scala/akka/http/impl/engine/client/ConnectionPoolSpec.scala @@ -280,7 +280,7 @@ class ConnectionPoolSpec extends AkkaSpec(""" // TODO getHostString in Java7 Tcp().bind(serverEndpoint.getHostName, serverEndpoint.getPort, idleTimeout = serverSettings.timeouts.idleTimeout) .map { c ⇒ - val layer = Http().serverLayer(serverSettings, log) + val layer = Http().serverLayer(serverSettings, log = log) Http.IncomingConnection(c.localAddress, c.remoteAddress, layer atop rawBytesInjection join c.flow) }.runWith(sink) if (autoAccept) null else incomingConnections.expectSubscription() diff --git a/akka-http-core/src/test/scala/akka/http/impl/engine/server/HttpServerSpec.scala b/akka-http-core/src/test/scala/akka/http/impl/engine/server/HttpServerSpec.scala index 589d480fa1..acf2e0b175 100644 --- a/akka-http-core/src/test/scala/akka/http/impl/engine/server/HttpServerSpec.scala +++ b/akka-http-core/src/test/scala/akka/http/impl/engine/server/HttpServerSpec.scala @@ -4,7 +4,8 @@ package akka.http.impl.engine.server -import akka.actor.ActorSystem +import java.net.{ InetAddress, InetSocketAddress } + import akka.http.ServerSettings import scala.util.Random @@ -13,7 +14,7 @@ import scala.concurrent.duration._ import org.scalatest.Inside import akka.util.ByteString import akka.stream.scaladsl._ -import akka.stream.{ FlowMaterializer, ActorFlowMaterializer } +import akka.stream.ActorFlowMaterializer import akka.stream.testkit._ import akka.http.scaladsl.model._ import akka.http.impl.util._ @@ -659,6 +660,24 @@ class HttpServerSpec extends AkkaSpec("akka.loggers = []\n akka.loglevel = OFF") expectRequest shouldEqual HttpRequest(uri = "http://example.com//foo", headers = List(Host("example.com"))) } + + "support remote-address-header" in new TestSetup { + lazy val theAddress = InetAddress.getByName("127.5.2.1") + + override def remoteAddress: Option[InetSocketAddress] = + Some(new InetSocketAddress(theAddress, 8080)) + + override def settings: ServerSettings = + super.settings.copy(remoteAddressHeader = true) + + send("""GET / HTTP/1.1 + |Host: example.com + | + |""".stripMarginWithNewline("\r\n")) + + val request = expectRequest + request.headers should contain(`Remote-Address`(RemoteAddress(theAddress))) + } } class TestSetup extends HttpServerTestSetupBase { implicit def system = spec.system diff --git a/akka-http-core/src/test/scala/akka/http/impl/engine/server/HttpServerTestSetupBase.scala b/akka-http-core/src/test/scala/akka/http/impl/engine/server/HttpServerTestSetupBase.scala index 2adab65eda..f901202d6e 100644 --- a/akka-http-core/src/test/scala/akka/http/impl/engine/server/HttpServerTestSetupBase.scala +++ b/akka-http-core/src/test/scala/akka/http/impl/engine/server/HttpServerTestSetupBase.scala @@ -4,6 +4,8 @@ package akka.http.impl.engine.server +import java.net.InetSocketAddress + import akka.stream.io.{ SendBytes, SslTlsOutbound, SessionBytes } import scala.concurrent.duration.FiniteDuration @@ -30,12 +32,13 @@ abstract class HttpServerTestSetupBase { val responses = TestPublisher.manualProbe[HttpResponse] def settings = ServerSettings(system).copy(serverHeader = Some(Server(List(ProductVersion("akka-http", "test"))))) + def remoteAddress: Option[InetSocketAddress] = None val (netIn, netOut) = { val netIn = TestPublisher.manualProbe[ByteString] val netOut = TestSubscriber.manualProbe[ByteString] - FlowGraph.closed(HttpServerBluePrint(settings, NoLogging)) { implicit b ⇒ + FlowGraph.closed(HttpServerBluePrint(settings, remoteAddress = remoteAddress, log = NoLogging)) { implicit b ⇒ server ⇒ import FlowGraph.Implicits._ Source(netIn) ~> Flow[ByteString].map(SessionBytes(null, _)) ~> server.in2