From 5a65cf6bb880de52673fa13cfd6e9985c12c3995 Mon Sep 17 00:00:00 2001 From: Mathias Date: Wed, 13 May 2015 14:47:12 +0200 Subject: [PATCH] !htc HTTPS-enable server- and client-side APIs --- .../java/akka/http/javadsl/HttpsContext.java | 35 +++ .../akka/http/ConnectionPoolSettings.scala | 10 +- .../client/OutgoingConnectionBlueprint.scala | 17 +- .../http/impl/engine/client/PoolGateway.scala | 3 +- .../engine/client/PoolInterfaceActor.scala | 16 +- .../http/impl/engine/client/PoolSlot.scala | 3 +- .../engine/server/HttpServerBluePrint.scala | 16 +- .../main/scala/akka/http/javadsl/Http.scala | 121 ++++++++- .../main/scala/akka/http/scaladsl/Http.scala | 243 ++++++++++++++---- .../engine/client/ConnectionPoolSpec.scala | 22 +- .../LowLevelOutgoingConnectionSpec.scala | 23 +- .../server/HttpServerTestSetupBase.scala | 8 +- .../scala/akka/http/scaladsl/TestClient.scala | 12 +- .../main/scala/akka/stream/io/SslTls.scala | 29 ++- 14 files changed, 418 insertions(+), 140 deletions(-) create mode 100644 akka-http-core/src/main/java/akka/http/javadsl/HttpsContext.java diff --git a/akka-http-core/src/main/java/akka/http/javadsl/HttpsContext.java b/akka-http-core/src/main/java/akka/http/javadsl/HttpsContext.java new file mode 100644 index 0000000000..b3820305a3 --- /dev/null +++ b/akka-http-core/src/main/java/akka/http/javadsl/HttpsContext.java @@ -0,0 +1,35 @@ +/** + * Copyright (C) 2009-2014 Typesafe Inc. + */ + +package akka.http.javadsl; + +import javax.net.ssl.SSLContext; +import javax.net.ssl.SSLParameters; + +import akka.japi.Option; +import akka.stream.io.ClientAuth; + +import java.util.Collection; + +public abstract class HttpsContext { + + public abstract SSLContext getSslContext(); + + public abstract Option> getEnabledCipherSuites(); + + public abstract Option> getEnabledProtocols(); + + public abstract Option getClientAuth(); + + public abstract Option getSslParameters(); + + public static HttpsContext create(SSLContext sslContext, + Option> enabledCipherSuites, + Option> enabledProtocols, + Option clientAuth, + Option sslParameters) { + return akka.http.scaladsl.HttpsContext.create(sslContext, enabledCipherSuites, enabledProtocols, + clientAuth, sslParameters); + } +} \ No newline at end of file diff --git a/akka-http-core/src/main/scala/akka/http/ConnectionPoolSettings.scala b/akka-http-core/src/main/scala/akka/http/ConnectionPoolSettings.scala index 0d46ffd824..b7d4a2427b 100644 --- a/akka-http-core/src/main/scala/akka/http/ConnectionPoolSettings.scala +++ b/akka-http-core/src/main/scala/akka/http/ConnectionPoolSettings.scala @@ -5,11 +5,11 @@ package akka.http import java.lang.{ Iterable ⇒ JIterable } +import akka.http.scaladsl.HttpsContext import com.typesafe.config.Config import scala.collection.immutable import scala.concurrent.duration.Duration import akka.japi.Util._ - import akka.actor.ActorSystem import akka.event.LoggingAdapter import akka.http.impl.util._ @@ -18,18 +18,18 @@ import akka.io.Inet final case class HostConnectionPoolSetup(host: String, port: Int, setup: ConnectionPoolSetup) final case class ConnectionPoolSetup( - encrypted: Boolean, options: immutable.Traversable[Inet.SocketOption], settings: ConnectionPoolSettings, + httpsContext: Option[HttpsContext], log: LoggingAdapter) object ConnectionPoolSetup { /** Java API */ - def create(encrypted: Boolean, - options: JIterable[Inet.SocketOption], + def create(options: JIterable[Inet.SocketOption], settings: ConnectionPoolSettings, + httpsContext: akka.japi.Option[akka.http.javadsl.HttpsContext], log: LoggingAdapter): ConnectionPoolSetup = - ConnectionPoolSetup(encrypted, immutableSeq(options), settings, log) + ConnectionPoolSetup(immutableSeq(options), settings, httpsContext.map(_.asInstanceOf[HttpsContext]), log) } final case class ConnectionPoolSettings( diff --git a/akka-http-core/src/main/scala/akka/http/impl/engine/client/OutgoingConnectionBlueprint.scala b/akka-http-core/src/main/scala/akka/http/impl/engine/client/OutgoingConnectionBlueprint.scala index d624c05c39..17067ee887 100644 --- a/akka-http-core/src/main/scala/akka/http/impl/engine/client/OutgoingConnectionBlueprint.scala +++ b/akka-http-core/src/main/scala/akka/http/impl/engine/client/OutgoingConnectionBlueprint.scala @@ -5,6 +5,7 @@ package akka.http.impl.engine.client import akka.http.ClientConnectionSettings +import akka.stream.io.{ SessionBytes, SslTlsInbound, SendBytes, SslTlsOutbound } import language.existentials import java.net.InetSocketAddress @@ -27,7 +28,7 @@ import akka.http.impl.util._ */ private[http] object OutgoingConnectionBlueprint { - type ClientShape = BidiShape[HttpRequest, ByteString, ByteString, HttpResponse] + type ClientShape = BidiShape[HttpRequest, SslTlsOutbound, SslTlsInbound, HttpResponse] /* Stream Setup @@ -87,10 +88,12 @@ private[http] object OutgoingConnectionBlueprint { val terminationFanout = b.add(Broadcast[HttpResponse](2)) val terminationMerge = b.add(new TerminationMerge) - val logger = Flow[ByteString].transform(() ⇒ errorLogger(log, "Outgoing request stream error")).named("errorLogger") - val bytesOut = (terminationMerge.out ~> requestRendering.via(logger)).outlet + val logger = b.add(Flow[ByteString].transform(() ⇒ errorLogger(log, "Outgoing request stream error")).named("errorLogger")) + val wrapTls = b.add(Flow[ByteString].map(SendBytes)) + terminationMerge.out ~> requestRendering ~> logger ~> wrapTls - val bytesIn = responseParsingMerge.in0 + val unwrapTls = b.add(Flow[SslTlsInbound].collect { case SessionBytes(_, bytes) ⇒ bytes }) + unwrapTls ~> responseParsingMerge.in0 methodBypassFanout.out(0) ~> terminationMerge.in0 @@ -99,10 +102,10 @@ private[http] object OutgoingConnectionBlueprint { responseParsingMerge.out ~> responsePrep ~> terminationFanout.in terminationFanout.out(0) ~> terminationMerge.in1 - BidiShape[HttpRequest, ByteString, ByteString, HttpResponse]( + BidiShape[HttpRequest, SslTlsOutbound, SslTlsInbound, HttpResponse]( methodBypassFanout.in, - bytesOut, - bytesIn, + wrapTls.outlet, + unwrapTls.inlet, terminationFanout.out(1)) } } diff --git a/akka-http-core/src/main/scala/akka/http/impl/engine/client/PoolGateway.scala b/akka-http-core/src/main/scala/akka/http/impl/engine/client/PoolGateway.scala index 34005517e6..4b497a7af5 100644 --- a/akka-http-core/src/main/scala/akka/http/impl/engine/client/PoolGateway.scala +++ b/akka-http-core/src/main/scala/akka/http/impl/engine/client/PoolGateway.scala @@ -1,10 +1,9 @@ package akka.http.impl.engine.client import java.util.concurrent.atomic.AtomicReference -import akka.http.HostConnectionPoolSetup - import scala.annotation.tailrec import scala.concurrent.{ Future, Promise } +import akka.http.HostConnectionPoolSetup import akka.actor.{ Props, ActorSystem, ActorRef } import akka.http.scaladsl.Http import akka.http.scaladsl.model.{ HttpResponse, HttpRequest } diff --git a/akka-http-core/src/main/scala/akka/http/impl/engine/client/PoolInterfaceActor.scala b/akka-http-core/src/main/scala/akka/http/impl/engine/client/PoolInterfaceActor.scala index 8935528b1d..62aa113ffc 100644 --- a/akka-http-core/src/main/scala/akka/http/impl/engine/client/PoolInterfaceActor.scala +++ b/akka-http-core/src/main/scala/akka/http/impl/engine/client/PoolInterfaceActor.scala @@ -5,8 +5,6 @@ package akka.http.impl.engine.client import java.net.InetSocketAddress -import akka.http.HostConnectionPoolSetup - import scala.annotation.tailrec import scala.concurrent.Promise import scala.concurrent.duration.FiniteDuration @@ -17,8 +15,9 @@ import akka.stream.actor.ActorPublisherMessage._ import akka.stream.actor.ActorSubscriberMessage._ import akka.stream.impl.FixedSizeBuffer import akka.stream.scaladsl.{ Sink, Source } -import akka.http.scaladsl.model.{ HttpResponse, HttpRequest } +import akka.http.HostConnectionPoolSetup import akka.http.impl.util.SeqActorName +import akka.http.scaladsl.model._ import akka.http.scaladsl.Http import PoolFlow._ @@ -59,7 +58,9 @@ private class PoolInterfaceActor(hcps: HostConnectionPoolSetup, import context.system import hcps._ import setup._ - val connectionFlow = Http().outgoingConnection(host, port, None, options, settings.connectionSettings, setup.log) + val connectionFlow = + if (httpsContext.isEmpty) Http().outgoingConnection(host, port, None, options, settings.connectionSettings, setup.log) + else Http().outgoingConnectionTls(host, port, None, options, settings.connectionSettings, httpsContext, setup.log) val poolFlow = PoolFlow(connectionFlow, new InetSocketAddress(host, port), settings, setup.log) Source(ActorPublisher(self)).via(poolFlow).runWith(Sink(ActorSubscriber[ResponseContext](self))) } @@ -135,7 +136,12 @@ private class PoolInterfaceActor(hcps: HostConnectionPoolSetup, } def dispatchRequest(pr: PoolRequest): Unit = { - val effectiveRequest = pr.request.withUri(pr.request.uri.toHttpRequestTargetOriginForm) + val scheme = Uri.httpScheme(hcps.setup.httpsContext.isDefined) + val hostHeader = headers.Host(hcps.host, Uri.normalizePort(hcps.port, scheme)) + val effectiveRequest = + pr.request + .withUri(pr.request.uri.toHttpRequestTargetOriginForm) + .withDefaultHeaders(hostHeader) val retries = if (pr.request.method.isIdempotent) hcps.setup.settings.maxRetries else 0 onNext(RequestContext(effectiveRequest, pr.responsePromise, retries)) } diff --git a/akka-http-core/src/main/scala/akka/http/impl/engine/client/PoolSlot.scala b/akka-http-core/src/main/scala/akka/http/impl/engine/client/PoolSlot.scala index 00a336d7ce..0f8fecd77a 100644 --- a/akka-http-core/src/main/scala/akka/http/impl/engine/client/PoolSlot.scala +++ b/akka-http-core/src/main/scala/akka/http/impl/engine/client/PoolSlot.scala @@ -4,14 +4,13 @@ package akka.http.impl.engine.client -import akka.http.ConnectionPoolSettings - import language.existentials import java.net.InetSocketAddress import scala.util.{ Failure, Success } import scala.collection.immutable import akka.actor._ import akka.http.scaladsl.model.{ HttpResponse, HttpRequest } +import akka.http.ConnectionPoolSettings import akka.http.impl.util._ import akka.stream.impl.{ SubscribePending, ExposedPublisher, ActorProcessor } import akka.stream.actor._ diff --git a/akka-http-core/src/main/scala/akka/http/impl/engine/server/HttpServerBluePrint.scala b/akka-http-core/src/main/scala/akka/http/impl/engine/server/HttpServerBluePrint.scala index 07a89583e4..5f5847b054 100644 --- a/akka-http-core/src/main/scala/akka/http/impl/engine/server/HttpServerBluePrint.scala +++ b/akka-http-core/src/main/scala/akka/http/impl/engine/server/HttpServerBluePrint.scala @@ -5,6 +5,7 @@ package akka.http.impl.engine.server import akka.http.ServerSettings +import akka.stream.io._ import org.reactivestreams.{ Subscriber, Publisher } import scala.util.control.NonFatal import akka.util.ByteString @@ -29,7 +30,7 @@ import ParserOutput._ */ private[http] object HttpServerBluePrint { - type ServerShape = BidiShape[HttpResponse, ByteString, ByteString, HttpRequest] + type ServerShape = BidiShape[HttpResponse, SslTlsOutbound, SslTlsInbound, HttpRequest] def apply(settings: ServerSettings, log: LoggingAdapter)(implicit mat: FlowMaterializer): Graph[ServerShape, Unit] = { import settings._ @@ -135,13 +136,16 @@ private[http] object HttpServerBluePrint { switchSource ~> wsSwitchTokenMerge.in1 wsSwitchTokenMerge.out ~> protocolRouter.in - val netIn = wsSwitchTokenMerge.in0 - val netOut = protocolMerge.out + val unwrapTls = b.add(Flow[SslTlsInbound] collect { case x: SessionBytes ⇒ x.bytes }) + val wrapTls = b.add(Flow[ByteString].map[SslTlsOutbound](SendBytes)) - BidiShape[HttpResponse, ByteString, ByteString, HttpRequest]( + unwrapTls ~> wsSwitchTokenMerge.in0 + protocolMerge.out ~> wrapTls + + BidiShape[HttpResponse, SslTlsOutbound, SslTlsInbound, HttpRequest]( bypassApplicationInput, - netOut, - netIn, + wrapTls.outlet, + unwrapTls.inlet, requestsIn) } } diff --git a/akka-http-core/src/main/scala/akka/http/javadsl/Http.scala b/akka-http-core/src/main/scala/akka/http/javadsl/Http.scala index 1c4e43e0bc..7fe028751e 100644 --- a/akka-http-core/src/main/scala/akka/http/javadsl/Http.scala +++ b/akka-http-core/src/main/scala/akka/http/javadsl/Http.scala @@ -6,10 +6,10 @@ package akka.http.javadsl import java.lang.{ Iterable ⇒ JIterable } import java.net.InetSocketAddress -import akka.http._ -import akka.stream.scaladsl.Keep - +import scala.language.implicitConversions import scala.concurrent.Future +import scala.util.Try +import akka.stream.scaladsl.Keep import akka.japi.Util._ import akka.japi.{ Option, Function } import akka.actor.{ ExtendedActorSystem, ActorSystem, ExtensionIdProvider, ExtensionId } @@ -17,11 +17,10 @@ import akka.event.LoggingAdapter import akka.io.Inet import akka.stream.FlowMaterializer import akka.stream.javadsl.{ Flow, Source } +import akka.http.impl.util.JavaMapping.Implicits._ import akka.http.scaladsl.{ model ⇒ sm } import akka.http.javadsl.model._ -import akka.http.impl.util.JavaMapping.Implicits._ - -import scala.util.Try +import akka.http._ object Http extends ExtensionId[Http] with ExtensionIdProvider { override def get(system: ActorSystem): Http = super.get(system) @@ -34,6 +33,9 @@ class Http(system: ExtendedActorSystem) extends akka.actor.Extension { private lazy val delegate = akka.http.scaladsl.Http(system) + private implicit def convertHttpsContext(hctx: Option[HttpsContext]) = + hctx.map(_.asInstanceOf[akka.http.scaladsl.HttpsContext]) + /** * Creates a [[Source]] of [[IncomingConnection]] instances which represents a prospective HTTP server binding * on the given `endpoint`. @@ -62,9 +64,10 @@ class Http(system: ExtendedActorSystem) extends akka.actor.Extension { def bind(interface: String, port: Int, backlog: Int, options: JIterable[Inet.SocketOption], settings: ServerSettings, + httpsContext: Option[HttpsContext], log: LoggingAdapter, materializer: FlowMaterializer): Source[IncomingConnection, Future[ServerBinding]] = - Source.adapt(delegate.bind(interface, port, backlog, immutableSeq(options), settings, log)(materializer) + Source.adapt(delegate.bind(interface, port, backlog, immutableSeq(options), settings, httpsContext, log)(materializer) .map(new IncomingConnection(_)) .mapMaterializedValue(_.map(new ServerBinding(_))(ec))) @@ -95,10 +98,11 @@ class Http(system: ExtendedActorSystem) extends akka.actor.Extension { interface: String, port: Int, backlog: Int, options: JIterable[Inet.SocketOption], settings: ServerSettings, + httpsContext: Option[HttpsContext], log: LoggingAdapter, materializer: FlowMaterializer): Future[ServerBinding] = delegate.bindAndHandle(handler.asInstanceOf[Flow[sm.HttpRequest, sm.HttpResponse, _]].asScala, - interface, port, backlog, immutableSeq(options), settings, log)(materializer) + interface, port, backlog, immutableSeq(options), settings, httpsContext, log)(materializer) .map(new ServerBinding(_))(ec) /** @@ -127,10 +131,11 @@ class Http(system: ExtendedActorSystem) extends akka.actor.Extension { interface: String, port: Int, backlog: Int, options: JIterable[Inet.SocketOption], settings: ServerSettings, + httpsContext: Option[HttpsContext], log: LoggingAdapter, materializer: FlowMaterializer): Future[ServerBinding] = delegate.bindAndHandleSync(handler.apply(_).asScala, - interface, port, backlog, immutableSeq(options), settings, log)(materializer) + interface, port, backlog, immutableSeq(options), settings, httpsContext, log)(materializer) .map(new ServerBinding(_))(ec) /** @@ -158,11 +163,11 @@ class Http(system: ExtendedActorSystem) extends akka.actor.Extension { def bindAndHandleAsync(handler: Function[HttpRequest, Future[HttpResponse]], interface: String, port: Int, backlog: Int, options: JIterable[Inet.SocketOption], - settings: ServerSettings, - log: LoggingAdapter, + settings: ServerSettings, httpsContext: Option[HttpsContext], + parallelism: Int, log: LoggingAdapter, materializer: FlowMaterializer): Future[ServerBinding] = delegate.bindAndHandleAsync(handler.apply(_).asInstanceOf[Future[sm.HttpResponse]], - interface, port, backlog, immutableSeq(options), settings, log)(materializer) + interface, port, backlog, immutableSeq(options), settings, httpsContext, parallelism, log)(materializer) .map(new ServerBinding(_))(ec) /** @@ -176,6 +181,16 @@ class Http(system: ExtendedActorSystem) extends akka.actor.Extension { .mapMaterializedValue(_.map(new OutgoingConnection(_))(ec)) } + /** + * Same as [[outgoingConnection]] but with HTTPS encryption. + */ + def outgoingConnectionTls(host: String, port: Int): Flow[HttpRequest, HttpResponse, Future[OutgoingConnection]] = + Flow.wrap { + akka.stream.scaladsl.Flow[HttpRequest].map(_.asScala) + .viaMat(delegate.outgoingConnectionTls(host, port))(Keep.right) + .mapMaterializedValue(_.map(new OutgoingConnection(_))(ec)) + } + /** * Creates a [[Flow]] representing a prospective HTTP client connection to the given endpoint. * Every materialization of the produced flow will attempt to establish a new outgoing connection. @@ -191,6 +206,25 @@ class Http(system: ExtendedActorSystem) extends akka.actor.Extension { .mapMaterializedValue(_.map(new OutgoingConnection(_))(ec)) } + /** + * Same as [[outgoingConnection]] but with HTTPS encryption. + * + * If an explicit [[HttpsContext]] is given then it rather than the configured default [[HttpsContext]] will be used + * for encryption on the connection. + */ + def outgoingConnectionTls(host: String, port: Int, + localAddress: Option[InetSocketAddress], + options: JIterable[Inet.SocketOption], + settings: ClientConnectionSettings, + httpsContext: Option[HttpsContext], + log: LoggingAdapter): Flow[HttpRequest, HttpResponse, Future[OutgoingConnection]] = + Flow.wrap { + akka.stream.scaladsl.Flow[HttpRequest].map(_.asScala) + .viaMat(delegate.outgoingConnectionTls(host, port, localAddress.asScala, immutableSeq(options), settings, + httpsContext.map(_.asInstanceOf[akka.http.scaladsl.HttpsContext]), log))(Keep.right) + .mapMaterializedValue(_.map(new OutgoingConnection(_))(ec)) + } + /** * Starts a new connection pool to the given host and configuration and returns a [[Flow]] which dispatches * the requests from all its materializations across this pool. @@ -208,6 +242,12 @@ class Http(system: ExtendedActorSystem) extends akka.actor.Extension { def newHostConnectionPool[T](host: String, port: Int, materializer: FlowMaterializer): Flow[(HttpRequest, T), (Try[HttpResponse], T), HostConnectionPool] = adaptTupleFlow(delegate.newHostConnectionPool[T](host, port)(materializer)) + /** + * Same as [[newHostConnectionPool]] but with HTTPS encryption. + */ + def newHostConnectionPoolTls[T](host: String, port: Int, materializer: FlowMaterializer): Flow[(HttpRequest, T), (Try[HttpResponse], T), HostConnectionPool] = + adaptTupleFlow(delegate.newHostConnectionPoolTls[T](host, port)(materializer)) + /** * Starts a new connection pool to the given host and configuration and returns a [[Flow]] which dispatches * the requests from all its materializations across this pool. @@ -228,6 +268,20 @@ class Http(system: ExtendedActorSystem) extends akka.actor.Extension { log: LoggingAdapter, materializer: FlowMaterializer): Flow[(HttpRequest, T), (Try[HttpResponse], T), HostConnectionPool] = adaptTupleFlow(delegate.newHostConnectionPool[T](host, port, immutableSeq(options), settings, log)(materializer)) + /** + * Same as [[newHostConnectionPool]] but with HTTPS encryption. + * + * If an explicit [[HttpsContext]] is given then it rather than the configured default [[HttpsContext]] will be used + * for encryption on the connection. + */ + def newHostConnectionPoolTls[T](host: String, port: Int, + options: JIterable[Inet.SocketOption], + settings: ConnectionPoolSettings, + httpsContext: Option[HttpsContext], + log: LoggingAdapter, materializer: FlowMaterializer): Flow[(HttpRequest, T), (Try[HttpResponse], T), HostConnectionPool] = + adaptTupleFlow(delegate.newHostConnectionPoolTls[T](host, port, immutableSeq(options), settings, + httpsContext.map(_.asInstanceOf[akka.http.scaladsl.HttpsContext]), log)(materializer)) + /** * Starts a new connection pool to the given host and configuration and returns a [[Flow]] which dispatches * the requests from all its materializations across this pool. @@ -265,6 +319,12 @@ class Http(system: ExtendedActorSystem) extends akka.actor.Extension { def cachedHostConnectionPool[T](host: String, port: Int, materializer: FlowMaterializer): Flow[(HttpRequest, T), (Try[HttpResponse], T), HostConnectionPool] = adaptTupleFlow(delegate.cachedHostConnectionPool[T](host, port)(materializer)) + /** + * Same as [[cachedHostConnectionPool]] but with HTTPS encryption. + */ + def cachedHostConnectionPoolTls[T](host: String, port: Int, materializer: FlowMaterializer): Flow[(HttpRequest, T), (Try[HttpResponse], T), HostConnectionPool] = + adaptTupleFlow(delegate.cachedHostConnectionPoolTls[T](host, port)(materializer)) + /** * Returns a [[Flow]] which dispatches incoming HTTP requests to the per-ActorSystem pool of outgoing * HTTP connections to the given target host endpoint. For every ActorSystem, target host and pool @@ -288,6 +348,20 @@ class Http(system: ExtendedActorSystem) extends akka.actor.Extension { log: LoggingAdapter, materializer: FlowMaterializer): Flow[(HttpRequest, T), (Try[HttpResponse], T), HostConnectionPool] = adaptTupleFlow(delegate.cachedHostConnectionPool[T](host, port, immutableSeq(options), settings, log)(materializer)) + /** + * Same as [[cachedHostConnectionPool]] but with HTTPS encryption. + * + * If an explicit [[HttpsContext]] is given then it rather than the configured default [[HttpsContext]] will be used + * for encryption on the connection. + */ + def cachedHostConnectionPoolTls[T](host: String, port: Int, + options: JIterable[Inet.SocketOption], + settings: ConnectionPoolSettings, + httpsContext: Option[HttpsContext], + log: LoggingAdapter, materializer: FlowMaterializer): Flow[(HttpRequest, T), (Try[HttpResponse], T), HostConnectionPool] = + adaptTupleFlow(delegate.cachedHostConnectionPoolTls[T](host, port, immutableSeq(options), settings, + httpsContext.map(_.asInstanceOf[akka.http.scaladsl.HttpsContext]), log)(materializer)) + /** * Returns a [[Flow]] which dispatches incoming HTTP requests to the per-ActorSystem pool of outgoing * HTTP connections to the given target host endpoint. For every ActorSystem, target host and pool @@ -328,6 +402,9 @@ class Http(system: ExtendedActorSystem) extends akka.actor.Extension { * depending on their respective effective URI. Note that incoming requests must have either an absolute URI or * a valid `Host` header. * + * If an explicit [[HttpsContext]] is given then it rather than the configured default [[HttpsContext]] will be used + * for setting up the HTTPS connection pool, if required. + * * Since the underlying transport usually comprises more than a single connection the produced flow might generate * responses in an order that doesn't directly match the consumed requests. * For example, if two requests A and B enter the flow in that order the response for B might be produced before the @@ -337,8 +414,9 @@ class Http(system: ExtendedActorSystem) extends akka.actor.Extension { */ def superPool[T](options: JIterable[Inet.SocketOption], settings: ConnectionPoolSettings, + httpsContext: Option[HttpsContext], log: LoggingAdapter, materializer: FlowMaterializer): Flow[(HttpRequest, T), (Try[HttpResponse], T), Unit] = - adaptTupleFlow(delegate.superPool[T](immutableSeq(options), settings, log)(materializer)) + adaptTupleFlow(delegate.superPool[T](immutableSeq(options), settings, httpsContext, log)(materializer)) /** * Fires a single [[HttpRequest]] across the (cached) host connection pool for the request's @@ -354,14 +432,18 @@ class Http(system: ExtendedActorSystem) extends akka.actor.Extension { * Fires a single [[HttpRequest]] across the (cached) host connection pool for the request's * effective URI to produce a response future. * + * If an explicit [[HttpsContext]] is given then it rather than the configured default [[HttpsContext]] will be used + * for setting up the HTTPS connection pool, if required. + * * Note that the request must have either an absolute URI or a valid `Host` header, otherwise * the future will be completed with an error. */ def singleRequest(request: HttpRequest, options: JIterable[Inet.SocketOption], settings: ConnectionPoolSettings, + httpsContext: Option[HttpsContext], log: LoggingAdapter, materializer: FlowMaterializer): Future[HttpResponse] = - delegate.singleRequest(request.asScala, immutableSeq(options), settings, log)(materializer) + delegate.singleRequest(request.asScala, immutableSeq(options), settings, httpsContext, log)(materializer) /** * Triggers an orderly shutdown of all host connections pools currently maintained by the [[ActorSystem]]. @@ -373,6 +455,17 @@ class Http(system: ExtendedActorSystem) extends akka.actor.Extension { */ def shutdownAllConnectionPools(): Future[Unit] = delegate.shutdownAllConnectionPools() + /** + * Gets the current default client-side [[HttpsContext]]. + */ + def defaultClientHttpsContext: HttpsContext = delegate.defaultClientHttpsContext + + /** + * Sets the default client-side [[HttpsContext]]. + */ + def setDefaultClientHttpsContext(context: HttpsContext): Unit = + delegate.setDefaultClientHttpsContext(context.asInstanceOf[akka.http.scaladsl.HttpsContext]) + private def adaptTupleFlow[T, Mat](scalaFlow: akka.stream.scaladsl.Flow[(scaladsl.model.HttpRequest, T), (Try[HttpResponse], T), Mat]): Flow[(HttpRequest, T), (Try[HttpResponse], T), Mat] = Flow.wrap { // we know that downcasting javadsl.model.HttpRequest => scaladsl.model.HttpRequest will always work diff --git a/akka-http-core/src/main/scala/akka/http/scaladsl/Http.scala b/akka-http-core/src/main/scala/akka/http/scaladsl/Http.scala index bfe26655c9..264b48dcaf 100644 --- a/akka-http-core/src/main/scala/akka/http/scaladsl/Http.scala +++ b/akka-http-core/src/main/scala/akka/http/scaladsl/Http.scala @@ -6,26 +6,34 @@ package akka.http.scaladsl import java.net.InetSocketAddress import java.util.concurrent.ConcurrentHashMap -import akka.http._ +import java.util.{ Collection ⇒ JCollection } +import javax.net.ssl.{ SSLParameters, SSLContext } +import akka.japi import com.typesafe.config.Config import scala.util.Try import scala.util.control.NonFatal -import scala.collection.immutable +import scala.collection.{ JavaConverters, immutable } import scala.concurrent.{ ExecutionContext, Promise, Future } import akka.event.LoggingAdapter -import akka.util.ByteString import akka.io.Inet import akka.stream.FlowMaterializer +import akka.stream.io._ import akka.stream.scaladsl._ import akka.http.impl.engine.client._ import akka.http.impl.engine.server._ import akka.http.scaladsl.util.FastFuture import akka.http.scaladsl.model._ +import akka.http._ import akka.actor._ class HttpExt(config: Config)(implicit system: ActorSystem) extends akka.actor.Extension { + import Http._ + // configured default HttpsContext for the client-side + // SYNCHRONIZED ACCESS ONLY! + private[this] var _defaultClientHttpsContext: HttpsContext = _ + /** * Creates a [[Source]] of [[IncomingConnection]] instances which represents a prospective HTTP server binding * on the given `endpoint`. @@ -35,17 +43,26 @@ class HttpExt(config: Config)(implicit system: ActorSystem) extends akka.actor.E * If the given port is non-zero subsequent materialization attempts of the produced source will immediately * fail, unless the first materialization has already been unbound. Unbinding can be triggered via the materialized * [[ServerBinding]]. + * + * If an [[HttpsContext]] is given it will be used for setting up TLS encryption on the binding. + * Otherwise the binding will be unencrypted. + * + * If no ``port`` is explicitly given (or the port value is negative) the protocol's default port will be used, + * which is 80 for HTTP and 443 for HTTPS. */ - def bind(interface: String, port: Int = 80, backlog: Int = 100, + def bind(interface: String, port: Int = -1, backlog: Int = 100, options: immutable.Traversable[Inet.SocketOption] = Nil, settings: ServerSettings = ServerSettings(system), + httpsContext: Option[HttpsContext] = None, log: LoggingAdapter = system.log)(implicit fm: FlowMaterializer): Source[IncomingConnection, Future[ServerBinding]] = { + val effectivePort = if (port >= 0) port else if (httpsContext.isEmpty) 80 else 443 + val tlsStage = sslTlsStage(httpsContext, Server) val connections: Source[Tcp.IncomingConnection, Future[Tcp.ServerBinding]] = - Tcp().bind(interface, port, backlog, options, settings.timeouts.idleTimeout) + Tcp().bind(interface, effectivePort, backlog, options, settings.timeouts.idleTimeout) connections.map { case Tcp.IncomingConnection(localAddress, remoteAddress, flow) ⇒ val layer = serverLayer(settings, log) - IncomingConnection(localAddress, remoteAddress, layer join flow) + IncomingConnection(localAddress, remoteAddress, layer atop tlsStage join flow) }.mapMaterializedValue { _.map(tcpBinding ⇒ ServerBinding(tcpBinding.localAddress)(() ⇒ tcpBinding.unbind()))(fm.executionContext) } @@ -60,11 +77,12 @@ class HttpExt(config: Config)(implicit system: ActorSystem) extends akka.actor.E * present a DoS risk! */ def bindAndHandle(handler: Flow[HttpRequest, HttpResponse, Any], - interface: String, port: Int = 80, backlog: Int = 100, + interface: String, port: Int = -1, backlog: Int = 100, options: immutable.Traversable[Inet.SocketOption] = Nil, settings: ServerSettings = ServerSettings(system), + httpsContext: Option[HttpsContext] = None, log: LoggingAdapter = system.log)(implicit fm: FlowMaterializer): Future[ServerBinding] = - bind(interface, port, backlog, options, settings, log).to { + bind(interface, port, backlog, options, settings, httpsContext, log).to { Sink.foreach { incomingConnection ⇒ try incomingConnection.flow.joinMat(handler)(Keep.both).run() catch { @@ -84,11 +102,12 @@ class HttpExt(config: Config)(implicit system: ActorSystem) extends akka.actor.E * present a DoS risk! */ def bindAndHandleSync(handler: HttpRequest ⇒ HttpResponse, - interface: String, port: Int = 80, backlog: Int = 100, + interface: String, port: Int = -1, backlog: Int = 100, options: immutable.Traversable[Inet.SocketOption] = Nil, settings: ServerSettings = ServerSettings(system), + httpsContext: Option[HttpsContext] = None, log: LoggingAdapter = system.log)(implicit fm: FlowMaterializer): Future[ServerBinding] = - bindAndHandle(Flow[HttpRequest].map(handler), interface, port, backlog, options, settings, log) + bindAndHandle(Flow[HttpRequest].map(handler), interface, port, backlog, options, settings, httpsContext, log) /** * Convenience method which starts a new HTTP server at the given endpoint and uses the given ``handler`` @@ -99,11 +118,13 @@ class HttpExt(config: Config)(implicit system: ActorSystem) extends akka.actor.E * present a DoS risk! */ def bindAndHandleAsync(handler: HttpRequest ⇒ Future[HttpResponse], - interface: String, port: Int = 80, backlog: Int = 100, + interface: String, port: Int = -1, backlog: Int = 100, options: immutable.Traversable[Inet.SocketOption] = Nil, settings: ServerSettings = ServerSettings(system), + httpsContext: Option[HttpsContext] = None, + parallelism: Int = 1, log: LoggingAdapter = system.log)(implicit fm: FlowMaterializer): Future[ServerBinding] = - bindAndHandle(Flow[HttpRequest].mapAsync(1)(handler), interface, port, backlog, options, settings, log) + bindAndHandle(Flow[HttpRequest].mapAsync(parallelism)(handler), interface, port, backlog, options, settings, httpsContext, log) /** * The type of the server-side HTTP layer as a stand-alone BidiStage @@ -111,13 +132,13 @@ class HttpExt(config: Config)(implicit system: ActorSystem) extends akka.actor.E * * {{{ * +------+ - * HttpResponse ~>| |~> ByteString + * HttpResponse ~>| |~> SslTlsOutbound * | bidi | - * HttpRequest <~| |<~ ByteString + * HttpRequest <~| |<~ SslTlsInbound * +------+ * }}} */ - type ServerLayer = BidiFlow[HttpResponse, ByteString, ByteString, HttpRequest, Unit] + type ServerLayer = BidiFlow[HttpResponse, SslTlsOutbound, SslTlsInbound, HttpRequest, Unit] /** * Constructs a [[ServerLayer]] stage using the configured default [[ServerSettings]]. The returned [[BidiFlow]] @@ -141,14 +162,34 @@ class HttpExt(config: Config)(implicit system: ActorSystem) extends akka.actor.E localAddress: Option[InetSocketAddress] = None, options: immutable.Traversable[Inet.SocketOption] = Nil, settings: ClientConnectionSettings = ClientConnectionSettings(system), - log: LoggingAdapter = system.log): Flow[HttpRequest, HttpResponse, Future[OutgoingConnection]] = { + log: LoggingAdapter = system.log): Flow[HttpRequest, HttpResponse, Future[OutgoingConnection]] = + _outgoingConnection(host, port, localAddress, options, settings, None, log) + + /** + * Same as [[outgoingConnection]] but for encrypted (HTTPS) connections. + * + * If an explicit [[HttpsContext]] is given then it rather than the configured default [[HttpsContext]] will be used + * for encryption on the connection. + */ + def outgoingConnectionTls(host: String, port: Int = 443, + localAddress: Option[InetSocketAddress] = None, + options: immutable.Traversable[Inet.SocketOption] = Nil, + settings: ClientConnectionSettings = ClientConnectionSettings(system), + httpsContext: Option[HttpsContext] = None, + log: LoggingAdapter = system.log): Flow[HttpRequest, HttpResponse, Future[OutgoingConnection]] = + _outgoingConnection(host, port, localAddress, options, settings, effectiveHttpsContext(httpsContext), log) + + private def _outgoingConnection(host: String, port: Int, localAddress: Option[InetSocketAddress], + options: immutable.Traversable[Inet.SocketOption], + settings: ClientConnectionSettings, httpsContext: Option[HttpsContext], + log: LoggingAdapter): Flow[HttpRequest, HttpResponse, Future[OutgoingConnection]] = { val remoteAddr = new InetSocketAddress(host, port) val layer = clientLayer(remoteAddr, settings, log) - + val tlsStage = sslTlsStage(httpsContext, Client) val transportFlow = Tcp().outgoingConnection(remoteAddr, localAddress, options, settings.connectingTimeout, settings.idleTimeout) - layer.joinMat(transportFlow) { (_, tcpConnFuture) ⇒ + layer.atop(tlsStage).joinMat(transportFlow) { (_, tcpConnFuture) ⇒ import system.dispatcher tcpConnFuture map { tcpConn ⇒ OutgoingConnection(tcpConn.localAddress, tcpConn.remoteAddress) } } @@ -160,13 +201,13 @@ class HttpExt(config: Config)(implicit system: ActorSystem) extends akka.actor.E * * {{{ * +------+ - * HttpRequest ~>| |~> ByteString + * HttpRequest ~>| |~> SslTlsOutbound * | bidi | - * HttpResponse <~| |<~ ByteString + * HttpResponse <~| |<~ SslTlsInbound * +------+ * }}} */ - type ClientLayer = BidiFlow[HttpRequest, ByteString, ByteString, HttpResponse, Unit] + type ClientLayer = BidiFlow[HttpRequest, SslTlsOutbound, SslTlsInbound, HttpResponse, Unit] /** * Constructs a [[ClientLayer]] stage using the configured default [[ClientConnectionSettings]]. @@ -200,9 +241,23 @@ class HttpExt(config: Config)(implicit system: ActorSystem) extends akka.actor.E options: immutable.Traversable[Inet.SocketOption] = Nil, settings: ConnectionPoolSettings = ConnectionPoolSettings(system), log: LoggingAdapter = system.log)(implicit fm: FlowMaterializer): Flow[(HttpRequest, T), (Try[HttpResponse], T), HostConnectionPool] = { - val cps = ConnectionPoolSetup(encrypted = false, options, settings, log) - val setup = HostConnectionPoolSetup(host, port, cps) - newHostConnectionPool(setup) + val cps = ConnectionPoolSetup(options, settings, None, log) + newHostConnectionPool(HostConnectionPoolSetup(host, port, cps)) + } + + /** + * Same as [[newHostConnectionPool]] but for encrypted (HTTPS) connections. + * + * If an explicit [[HttpsContext]] is given then it rather than the configured default [[HttpsContext]] will be used + * for encryption on the connections. + */ + def newHostConnectionPoolTls[T](host: String, port: Int = 443, + options: immutable.Traversable[Inet.SocketOption] = Nil, + settings: ConnectionPoolSettings = ConnectionPoolSettings(system), + httpsContext: Option[HttpsContext] = None, + log: LoggingAdapter = system.log)(implicit fm: FlowMaterializer): Flow[(HttpRequest, T), (Try[HttpResponse], T), HostConnectionPool] = { + val cps = ConnectionPoolSetup(options, settings, effectiveHttpsContext(httpsContext), log) + newHostConnectionPool(HostConnectionPoolSetup(host, port, cps)) } /** @@ -246,7 +301,23 @@ class HttpExt(config: Config)(implicit system: ActorSystem) extends akka.actor.E options: immutable.Traversable[Inet.SocketOption] = Nil, settings: ConnectionPoolSettings = ConnectionPoolSettings(system), log: LoggingAdapter = system.log)(implicit fm: FlowMaterializer): Flow[(HttpRequest, T), (Try[HttpResponse], T), HostConnectionPool] = { - val cps = ConnectionPoolSetup(encrypted = false, options, settings, log) + val cps = ConnectionPoolSetup(options, settings, None, log) + val setup = HostConnectionPoolSetup(host, port, cps) + cachedHostConnectionPool(setup) + } + + /** + * Same as [[cachedHostConnectionPool]] but for encrypted (HTTPS) connections. + * + * If an explicit [[HttpsContext]] is given then it rather than the configured default [[HttpsContext]] will be used + * for encryption on the connections. + */ + def cachedHostConnectionPoolTls[T](host: String, port: Int = 80, + options: immutable.Traversable[Inet.SocketOption] = Nil, + settings: ConnectionPoolSettings = ConnectionPoolSettings(system), + httpsContext: Option[HttpsContext] = None, + log: LoggingAdapter = system.log)(implicit fm: FlowMaterializer): Flow[(HttpRequest, T), (Try[HttpResponse], T), HostConnectionPool] = { + val cps = ConnectionPoolSetup(options, settings, effectiveHttpsContext(httpsContext), log) val setup = HostConnectionPoolSetup(host, port, cps) cachedHostConnectionPool(setup) } @@ -274,8 +345,10 @@ class HttpExt(config: Config)(implicit system: ActorSystem) extends akka.actor.E /** * Creates a new "super connection pool flow", which routes incoming requests to a (cached) host connection pool - * depending on their respective effective URI. Note that incoming requests must have either an absolute URI or - * a valid `Host` header. + * depending on their respective effective URI. Note that incoming requests must have an absolute URI. + * + * If an explicit [[HttpsContext]] is given then it rather than the configured default [[HttpsContext]] will be used + * for setting up HTTPS connection pools, if required. * * Since the underlying transport usually comprises more than a single connection the produced flow might generate * responses in an order that doesn't directly match the consumed requests. @@ -286,35 +359,27 @@ class HttpExt(config: Config)(implicit system: ActorSystem) extends akka.actor.E */ def superPool[T](options: immutable.Traversable[Inet.SocketOption] = Nil, settings: ConnectionPoolSettings = ConnectionPoolSettings(system), - log: LoggingAdapter = system.log)(implicit fm: FlowMaterializer): Flow[(HttpRequest, T), (Try[HttpResponse], T), Unit] = { - val setup = ConnectionPoolSetup(encrypted = false, options, settings, log) - clientFlow[T](settings) { request ⇒ - val absoluteRequest = request.withEffectiveUri(securedConnection = false) - val Uri.Authority(host, port, _) = absoluteRequest.uri.authority - val hcps = HostConnectionPoolSetup(host.toString(), port, setup) - val theHostHeader = hostHeader(hcps.host, port, absoluteRequest.uri.scheme) - val effectiveRequest = absoluteRequest.withDefaultHeaders(theHostHeader) - effectiveRequest -> cachedGateway(hcps) - } - } + httpsContext: Option[HttpsContext] = None, + log: LoggingAdapter = system.log)(implicit fm: FlowMaterializer): Flow[(HttpRequest, T), (Try[HttpResponse], T), Unit] = + clientFlow[T](settings) { request ⇒ request -> cachedGateway(request, options, settings, httpsContext, log) } /** * Fires a single [[HttpRequest]] across the (cached) host connection pool for the request's * effective URI to produce a response future. * - * Note that the request must have either an absolute URI or a valid `Host` header, otherwise - * the future will be completed with an error. + * If an explicit [[HttpsContext]] is given then it rather than the configured default [[HttpsContext]] will be used + * for setting up the HTTPS connection pool, if required. + * + * Note that the request must have an absolute URI, otherwise the future will be completed with an error. */ def singleRequest(request: HttpRequest, options: immutable.Traversable[Inet.SocketOption] = Nil, settings: ConnectionPoolSettings = ConnectionPoolSettings(system), + httpsContext: Option[HttpsContext] = None, log: LoggingAdapter = system.log)(implicit fm: FlowMaterializer): Future[HttpResponse] = try { - val setup = ConnectionPoolSetup(encrypted = false, options, settings, log) - val effectiveRequest = request.withEffectiveUri(securedConnection = false) - val uri = effectiveRequest.uri - val hcps = HostConnectionPoolSetup(uri.authority.host.toString(), uri.effectivePort, setup) - cachedGateway(hcps).flatMap(_(effectiveRequest))(fm.executionContext) + val gatewayFuture = cachedGateway(request, options, settings, httpsContext, log) + gatewayFuture.flatMap(_(request))(fm.executionContext) } catch { case e: IllegalUriException ⇒ FastFuture.failed(e) } @@ -335,9 +400,45 @@ class HttpExt(config: Config)(implicit system: ActorSystem) extends akka.actor.E Future.sequence(gateways.map(_.flatMap(_.shutdown()))).map(_ ⇒ ()) } + /** + * Gets the current default client-side [[HttpsContext]]. + */ + def defaultClientHttpsContext: HttpsContext = + synchronized { + _defaultClientHttpsContext match { + case null ⇒ + val ctx = HttpsContext(SSLContext.getDefault) + _defaultClientHttpsContext = ctx + ctx + case ctx ⇒ ctx + } + } + + /** + * Sets the default client-side [[HttpsContext]]. + */ + def setDefaultClientHttpsContext(context: HttpsContext): Unit = + synchronized { + _defaultClientHttpsContext = context + } + // every ActorSystem maintains its own connection pools private[this] val hostPoolCache = new ConcurrentHashMap[HostConnectionPoolSetup, Future[PoolGateway]] + private def cachedGateway(request: HttpRequest, options: immutable.Traversable[Inet.SocketOption], + settings: ConnectionPoolSettings, httpsContext: Option[HttpsContext], + log: LoggingAdapter)(implicit fm: FlowMaterializer): Future[PoolGateway] = + if (request.uri.scheme.nonEmpty && request.uri.authority.nonEmpty) { + val httpsCtx = if (request.uri.scheme.equalsIgnoreCase("https")) effectiveHttpsContext(httpsContext) else None + val setup = ConnectionPoolSetup(options, settings, httpsCtx, log) + val host = request.uri.authority.host.toString() + val hcps = HostConnectionPoolSetup(host, request.uri.effectivePort, setup) + cachedGateway(hcps) + } else { + val msg = s"Cannot determine request scheme and target endpoint as ${request.method} request to ${request.uri} doesn't have an absolute URI" + throw new IllegalUriException(ErrorInfo(msg)) + } + private[http] def cachedGateway(setup: HostConnectionPoolSetup)(implicit fm: FlowMaterializer): Future[PoolGateway] = { val gatewayPromise = Promise[PoolGateway]() hostPoolCache.putIfAbsent(setup, gatewayPromise.future) match { @@ -363,12 +464,9 @@ class HttpExt(config: Config)(implicit system: ActorSystem) extends akka.actor.E private def gatewayClientFlow[T](hcps: HostConnectionPoolSetup, gatewayFuture: Future[PoolGateway])( - implicit fm: FlowMaterializer): Flow[(HttpRequest, T), (Try[HttpResponse], T), HostConnectionPool] = { - import hcps._ - val theHostHeader = hostHeader(host, port, Uri.httpScheme(setup.encrypted)) - clientFlow[T](setup.settings)(_.withDefaultHeaders(theHostHeader) -> gatewayFuture) + implicit fm: FlowMaterializer): Flow[(HttpRequest, T), (Try[HttpResponse], T), HostConnectionPool] = + clientFlow[T](hcps.setup.settings)(_ -> gatewayFuture) .mapMaterializedValue(_ ⇒ HostConnectionPool(hcps)(gatewayFuture)) - } private def clientFlow[T](settings: ConnectionPoolSettings)(f: HttpRequest ⇒ (HttpRequest, Future[PoolGateway]))( implicit system: ActorSystem, fm: FlowMaterializer): Flow[(HttpRequest, T), (Try[HttpResponse], T), Unit] = { @@ -385,7 +483,14 @@ class HttpExt(config: Config)(implicit system: ActorSystem) extends akka.actor.E } } - private def hostHeader(host: String, port: Int, scheme: String) = headers.Host(host, Uri.normalizePort(port, scheme)) + private def effectiveHttpsContext(ctx: Option[HttpsContext]): Option[HttpsContext] = + ctx orElse Some(defaultClientHttpsContext) + + private def sslTlsStage(httpsContext: Option[HttpsContext], role: Role) = + httpsContext match { + case Some(hctx) ⇒ SslTls(hctx.sslContext, hctx.firstSession, role) + case None ⇒ SslTlsPlacebo.forScala + } } object Http extends ExtensionId[HttpExt] with ExtensionIdProvider { @@ -465,3 +570,39 @@ object Http extends ExtensionId[HttpExt] with ExtensionIdProvider { def createExtension(system: ExtendedActorSystem): HttpExt = new HttpExt(system.settings.config getConfig "akka.http")(system) } + +import JavaConverters._ + +case class HttpsContext(sslContext: SSLContext, + enabledCipherSuites: Option[immutable.Seq[String]] = None, + enabledProtocols: Option[immutable.Seq[String]] = None, + clientAuth: Option[ClientAuth] = None, + sslParameters: Option[SSLParameters] = None) extends akka.http.javadsl.HttpsContext { + def firstSession = NegotiateNewSession(enabledCipherSuites, enabledProtocols, clientAuth, sslParameters) + + /** Java API */ + def getSslContext: SSLContext = sslContext + + /** Java API */ + def getEnabledCipherSuites: japi.Option[JCollection[String]] = enabledCipherSuites.map(_.asJavaCollection) + + /** Java API */ + def getEnabledProtocols: japi.Option[JCollection[String]] = enabledProtocols.map(_.asJavaCollection) + + /** Java API */ + def getClientAuth: japi.Option[ClientAuth] = clientAuth + + /** Java API */ + def getSslParameters: japi.Option[SSLParameters] = sslParameters +} + +object HttpsContext { + /** INTERNAL API **/ + private[http] def create(sslContext: SSLContext, + enabledCipherSuites: japi.Option[JCollection[String]], + enabledProtocols: japi.Option[JCollection[String]], + clientAuth: japi.Option[ClientAuth], + sslParameters: japi.Option[SSLParameters]) = + HttpsContext(sslContext, enabledCipherSuites.map(_.asScala.toList), enabledProtocols.map(_.asScala.toList), + clientAuth, sslParameters) +} \ No newline at end of file diff --git a/akka-http-core/src/test/scala/akka/http/impl/engine/client/ConnectionPoolSpec.scala b/akka-http-core/src/test/scala/akka/http/impl/engine/client/ConnectionPoolSpec.scala index 52aca0aa4c..42f4228753 100644 --- a/akka-http-core/src/test/scala/akka/http/impl/engine/client/ConnectionPoolSpec.scala +++ b/akka-http-core/src/test/scala/akka/http/impl/engine/client/ConnectionPoolSpec.scala @@ -5,14 +5,14 @@ package akka.http.impl.engine.client import java.util.concurrent.atomic.AtomicInteger -import akka.http.{ ClientConnectionSettings, ConnectionPoolSettings, ServerSettings } - import scala.concurrent.Await import scala.concurrent.duration._ import scala.util.{ Failure, Success, Try } +import akka.util.ByteString import akka.http.scaladsl.{ TestUtils, Http } import akka.http.impl.util.{ SingletonException, StreamUtils } -import akka.util.ByteString +import akka.http.{ ClientConnectionSettings, ConnectionPoolSettings, ServerSettings } +import akka.stream.io.{ SessionBytes, SendBytes, SslTlsInbound, SslTlsOutbound } import akka.stream.{ BidiShape, ActorFlowMaterializer } import akka.stream.testkit.{ TestPublisher, TestSubscriber, AkkaSpec } import akka.stream.scaladsl._ @@ -174,14 +174,6 @@ class ConnectionPoolSpec extends AkkaSpec("akka.loggers = []\n akka.loglevel = O "The single-request client infrastructure" should { class LocalTestSetup extends TestSetup(ServerSettings(system).copy(rawRequestUriHeader = true), autoAccept = true) - "properly complete a simple request/response cycle with a host header request" in new LocalTestSetup { - val request = HttpRequest(uri = "/abc", headers = List(Host(serverHostName, serverPort))) - val responseFuture = Http().singleRequest(request) - val responseHeaders = Await.result(responseFuture, 1.second).headers - responseHeaders should contain(RawHeader("Req-Uri", s"http://$serverHostName:$serverPort/abc")) - responseHeaders should contain(RawHeader("Req-Raw-Request-URI", "/abc")) - } - "transform absolute request URIs into relative URIs plus host header" in new LocalTestSetup { val request = HttpRequest(uri = s"http://$serverHostName:$serverPort/abc?query#fragment") val responseFuture = Http().singleRequest(request) @@ -205,11 +197,11 @@ class ConnectionPoolSpec extends AkkaSpec("akka.loggers = []\n akka.loglevel = O responseHeaders should contain(RawHeader("Req-Raw-Request-URI", "//foo")) } - "produce an error if the request does not contain a Host-header or an absolute URI" in { + "produce an error if the request does not have an absolute URI" in { val request = HttpRequest(uri = "/foo") val responseFuture = Http().singleRequest(request) val thrown = the[IllegalUriException] thrownBy Await.result(responseFuture, 1.second) - thrown should have message "Cannot establish effective URI of request to `/foo`, request has a relative URI and is missing a `Host` header" + thrown should have message "Cannot determine request scheme and target endpoint as HttpMethod(GET) request to /foo doesn't have an absolute URI" } } @@ -251,9 +243,9 @@ class ConnectionPoolSpec extends AkkaSpec("akka.loggers = []\n akka.loglevel = O val incomingConnections = TestSubscriber.manualProbe[Http.IncomingConnection] val incomingConnectionsSub = { val rawBytesInjection = BidiFlow() { b ⇒ - val top = b.add(Flow[ByteString].map(mapServerSideOutboundRawBytes) + val top = b.add(Flow[SslTlsOutbound].collect[ByteString] { case SendBytes(x) ⇒ mapServerSideOutboundRawBytes(x) } .transform(StreamUtils.recover { case NoErrorComplete ⇒ ByteString.empty })) - val bottom = b.add(Flow[ByteString]) + val bottom = b.add(Flow[ByteString].map(SessionBytes(null, _))) BidiShape(top.inlet, top.outlet, bottom.inlet, bottom.outlet) } val sink = if (autoAccept) Sink.foreach[Http.IncomingConnection](handleConnection) else Sink(incomingConnections) diff --git a/akka-http-core/src/test/scala/akka/http/impl/engine/client/LowLevelOutgoingConnectionSpec.scala b/akka-http-core/src/test/scala/akka/http/impl/engine/client/LowLevelOutgoingConnectionSpec.scala index 31f7405b3e..2c61840b43 100644 --- a/akka-http-core/src/test/scala/akka/http/impl/engine/client/LowLevelOutgoingConnectionSpec.scala +++ b/akka-http-core/src/test/scala/akka/http/impl/engine/client/LowLevelOutgoingConnectionSpec.scala @@ -6,6 +6,7 @@ package akka.http.impl.engine.client import java.net.InetSocketAddress import akka.http.ClientConnectionSettings +import akka.stream.io.{ SessionBytes, SslTlsOutbound, SendBytes } import org.scalatest.Inside import akka.util.ByteString import akka.event.NoLogging @@ -34,7 +35,7 @@ class LowLevelOutgoingConnectionSpec extends AkkaSpec("akka.loggers = []\n akka. | |""") - netInSub.expectRequest(16) + netInSub.expectRequest() sendWireData( """HTTP/1.1 200 OK |Content-Length: 0 @@ -71,7 +72,7 @@ class LowLevelOutgoingConnectionSpec extends AkkaSpec("akka.loggers = []\n akka. expectWireData("XY") sub.sendComplete() - netInSub.expectRequest(16) + netInSub.expectRequest() sendWireData( """HTTP/1.1 200 OK |Content-Length: 0 @@ -96,7 +97,7 @@ class LowLevelOutgoingConnectionSpec extends AkkaSpec("akka.loggers = []\n akka. | |""") - netInSub.expectRequest(16) + netInSub.expectRequest() sendWireData( """HTTP/1.1 200 OK |Transfer-Encoding: chunked @@ -140,7 +141,7 @@ class LowLevelOutgoingConnectionSpec extends AkkaSpec("akka.loggers = []\n akka. | |""") - netInSub.expectRequest(16) + netInSub.expectRequest() sendWireData( """HTTP/1.1 200 OK |Content-Length: 0 @@ -166,7 +167,7 @@ class LowLevelOutgoingConnectionSpec extends AkkaSpec("akka.loggers = []\n akka. | |""") - netInSub.expectRequest(16) + netInSub.expectRequest() sendWireData( """HTTP/1.1 200 OK |Transfer-Encoding: chunked @@ -239,7 +240,6 @@ class LowLevelOutgoingConnectionSpec extends AkkaSpec("akka.loggers = []\n akka. info.summary shouldEqual "HTTP message had declared Content-Length 8 but entity data stream amounts to 2 bytes less" netInSub.sendComplete() responses.expectComplete() - netInSub.expectCancellation() } "catch the entity stream being longer than the Content-Length" in new TestSetup { @@ -265,7 +265,6 @@ class LowLevelOutgoingConnectionSpec extends AkkaSpec("akka.loggers = []\n akka. info.summary shouldEqual "HTTP message had declared Content-Length 8 but entity data stream amounts to more bytes" netInSub.sendComplete() responses.expectComplete() - netInSub.expectCancellation() } "catch illegal response starts" in new TestSetup { @@ -277,7 +276,7 @@ class LowLevelOutgoingConnectionSpec extends AkkaSpec("akka.loggers = []\n akka. | |""") - netInSub.expectRequest(16) + netInSub.expectRequest() sendWireData( """HTTP/1.2 200 OK | @@ -298,7 +297,7 @@ class LowLevelOutgoingConnectionSpec extends AkkaSpec("akka.loggers = []\n akka. | |""") - netInSub.expectRequest(16) + netInSub.expectRequest() sendWireData( """HTTP/1.1 200 OK |Transfer-Encoding: chunked @@ -336,7 +335,7 @@ class LowLevelOutgoingConnectionSpec extends AkkaSpec("akka.loggers = []\n akka. | |""") - netInSub.expectRequest(16) + netInSub.expectRequest() sendWireData("HTTP/1.1 200 OK") netInSub.sendComplete() @@ -363,8 +362,8 @@ class LowLevelOutgoingConnectionSpec extends AkkaSpec("akka.loggers = []\n akka. FlowGraph.closed(OutgoingConnectionBlueprint(remoteAddress, settings, NoLogging)) { implicit b ⇒ client ⇒ import FlowGraph.Implicits._ - Source(netIn) ~> client.in2 - client.out1 ~> Sink(netOut) + Source(netIn) ~> Flow[ByteString].map(SessionBytes(null, _)) ~> client.in2 + client.out1 ~> Flow[SslTlsOutbound].collect { case SendBytes(x) ⇒ x } ~> Sink(netOut) Source(requests) ~> client.in1 client.out2 ~> Sink(responses) }.run() diff --git a/akka-http-core/src/test/scala/akka/http/impl/engine/server/HttpServerTestSetupBase.scala b/akka-http-core/src/test/scala/akka/http/impl/engine/server/HttpServerTestSetupBase.scala index 5e6b3e70b8..2adab65eda 100644 --- a/akka-http-core/src/test/scala/akka/http/impl/engine/server/HttpServerTestSetupBase.scala +++ b/akka-http-core/src/test/scala/akka/http/impl/engine/server/HttpServerTestSetupBase.scala @@ -4,6 +4,8 @@ package akka.http.impl.engine.server +import akka.stream.io.{ SendBytes, SslTlsOutbound, SessionBytes } + import scala.concurrent.duration.FiniteDuration import akka.actor.ActorSystem @@ -11,7 +13,7 @@ import akka.event.NoLogging import akka.util.ByteString import akka.stream.FlowMaterializer -import akka.stream.scaladsl.{ Sink, Source, FlowGraph } +import akka.stream.scaladsl.{ Flow, Sink, Source, FlowGraph } import akka.stream.testkit.{ TestPublisher, TestSubscriber } import akka.http.impl.util._ @@ -36,8 +38,8 @@ abstract class HttpServerTestSetupBase { FlowGraph.closed(HttpServerBluePrint(settings, NoLogging)) { implicit b ⇒ server ⇒ import FlowGraph.Implicits._ - Source(netIn) ~> server.in2 - server.out1 ~> Sink(netOut) + Source(netIn) ~> Flow[ByteString].map(SessionBytes(null, _)) ~> server.in2 + server.out1 ~> Flow[SslTlsOutbound].collect { case SendBytes(x) ⇒ x } ~> Sink(netOut) server.out2 ~> Sink(requests) Source(responses) ~> server.in1 }.run() diff --git a/akka-http-core/src/test/scala/akka/http/scaladsl/TestClient.scala b/akka-http-core/src/test/scala/akka/http/scaladsl/TestClient.scala index 6958f0b80c..4e859e9497 100644 --- a/akka-http-core/src/test/scala/akka/http/scaladsl/TestClient.scala +++ b/akka-http-core/src/test/scala/akka/http/scaladsl/TestClient.scala @@ -14,16 +14,16 @@ import akka.http.impl.util._ object TestClient extends App { val testConf: Config = ConfigFactory.parseString(""" - akka.loglevel = INFO + akka.loglevel = DEBUG akka.log-dead-letters = off - akka.io.tcp.trace-logging = on""") + akka.io.tcp.trace-logging = off""") implicit val system = ActorSystem("ServerTest", testConf) implicit val fm = ActorFlowMaterializer() import system.dispatcher installEventStreamLoggerFor[UnhandledMessage] - val host = "spray.io" + val host = "github.com" fetchServerVersion1() @@ -31,9 +31,9 @@ object TestClient extends App { // system.shutdown() def fetchServerVersion1(): Unit = { - println(s"Fetching HTTP server version of host `$host` via a direct low-level connection ...") + println(s"Fetching HTTPS server version of host `$host` via a direct low-level connection ...") - val connection = Http().outgoingConnection(host) + val connection = Http().outgoingConnectionTls(host) val result = Source.single(HttpRequest()).via(connection).runWith(Sink.head) result.map(_.header[headers.Server]) onComplete { case Success(res) ⇒ @@ -50,7 +50,7 @@ object TestClient extends App { def fetchServerVersion2(): Unit = { println(s"Fetching HTTP server version of host `$host` via the high-level API ...") - val result = Http().singleRequest(HttpRequest(uri = s"http://$host/")) + val result = Http().singleRequest(HttpRequest(uri = s"https://$host/")) result.map(_.header[headers.Server]) onComplete { case Success(res) ⇒ println(s"$host is running ${res mkString ", "}") diff --git a/akka-stream/src/main/scala/akka/stream/io/SslTls.scala b/akka-stream/src/main/scala/akka/stream/io/SslTls.scala index 52187e742c..19c23bf0f3 100644 --- a/akka-stream/src/main/scala/akka/stream/io/SslTls.scala +++ b/akka-stream/src/main/scala/akka/stream/io/SslTls.scala @@ -49,6 +49,9 @@ import java.security.cert.Certificate */ object SslTls { + type ScalaFlow = scaladsl.BidiFlow[SslTlsOutbound, ByteString, ByteString, SslTlsInbound, Unit] + type JavaFlow = javadsl.BidiFlow[SslTlsOutbound, ByteString, ByteString, SslTlsInbound, Unit] + /** * Scala API: create a StreamTls [[akka.stream.scaladsl.BidiFlow]]. The * SSLContext will be used to create an SSLEngine to which then the @@ -61,7 +64,7 @@ object SslTls { * For a description of the `closing` parameter please refer to [[Closing]]. */ def apply(sslContext: SSLContext, firstSession: NegotiateNewSession, - role: Role, closing: Closing = IgnoreComplete): scaladsl.BidiFlow[SslTlsOutbound, ByteString, ByteString, SslTlsInbound, Unit] = + role: Role, closing: Closing = IgnoreComplete): ScalaFlow = new scaladsl.BidiFlow(TlsModule(OperationAttributes.none, sslContext, firstSession, role, closing)) /** @@ -75,7 +78,7 @@ object SslTls { * * This method uses the default closing behavior or [[IgnoreComplete]]. */ - def create(sslContext: SSLContext, firstSession: NegotiateNewSession, role: Role): javadsl.BidiFlow[SslTlsOutbound, ByteString, ByteString, SslTlsInbound, Unit] = + def create(sslContext: SSLContext, firstSession: NegotiateNewSession, role: Role): JavaFlow = new javadsl.BidiFlow(apply(sslContext, firstSession, role)) /** @@ -89,7 +92,7 @@ object SslTls { * * For a description of the `closing` parameter please refer to [[Closing]]. */ - def create(sslContext: SSLContext, firstSession: NegotiateNewSession, role: Role, closing: Closing): javadsl.BidiFlow[SslTlsOutbound, ByteString, ByteString, SslTlsInbound, Unit] = + def create(sslContext: SSLContext, firstSession: NegotiateNewSession, role: Role, closing: Closing): JavaFlow = new javadsl.BidiFlow(apply(sslContext, firstSession, role, closing)) /** @@ -138,14 +141,16 @@ object SslTls { * unwrapping [[SendBytes]]. */ object SslTlsPlacebo { - val forScala = scaladsl.BidiFlow() { implicit b ⇒ - // this constructs a session for (invalid) protocol SSL_NULL_WITH_NULL_NULL - val session = SSLContext.getDefault.createSSLEngine.getSession - val top = b.add(scaladsl.Flow[SslTlsOutbound].collect { case SendBytes(b) ⇒ b }) - val bottom = b.add(scaladsl.Flow[ByteString].map(SessionBytes(session, _))) - BidiShape(top, bottom) - } - val forJava = new javadsl.BidiFlow(forScala) + val forScala: scaladsl.BidiFlow[SslTlsOutbound, ByteString, ByteString, SessionBytes, Unit] = + scaladsl.BidiFlow() { implicit b ⇒ + // this constructs a session for (invalid) protocol SSL_NULL_WITH_NULL_NULL + val session = SSLContext.getDefault.createSSLEngine.getSession + val top = b.add(scaladsl.Flow[SslTlsOutbound].collect { case SendBytes(b) ⇒ b }) + val bottom = b.add(scaladsl.Flow[ByteString].map(SessionBytes(session, _))) + BidiShape(top, bottom) + } + val forJava: javadsl.BidiFlow[SslTlsOutbound, ByteString, ByteString, SessionBytes, Unit] = + new javadsl.BidiFlow(forScala) } /** @@ -206,7 +211,7 @@ case object Server extends Server * - [[EagerClose]] means to not ignore signals * - [[IgnoreCancel]] means to not react to cancellation of the receiving * side unless the sending side has already completed - * - [[IgnoreComplete]] means to not reacto the completion of the sending + * - [[IgnoreComplete]] means to not react to the completion of the sending * side unless the receiving side has already cancelled * - [[IgnoreBoth]] means to ignore the first termination signal—be that * cancellation or completion—and only act upon the second one