diff --git a/akka-http-core/src/main/scala/akka/http/ConnectionPoolSettings.scala b/akka-http-core/src/main/scala/akka/http/ConnectionPoolSettings.scala index 2153cc8ec1..3ce74e6cc9 100644 --- a/akka-http-core/src/main/scala/akka/http/ConnectionPoolSettings.scala +++ b/akka-http-core/src/main/scala/akka/http/ConnectionPoolSettings.scala @@ -16,7 +16,7 @@ final case class HostConnectionPoolSetup(host: String, port: Int, setup: Connect final case class ConnectionPoolSetup( settings: ConnectionPoolSettings, - conContext: ConnectionContext = ConnectionContext.noEncryption(), + connectionContext: ConnectionContext = ConnectionContext.noEncryption(), log: LoggingAdapter) object ConnectionPoolSetup { diff --git a/akka-http-core/src/main/scala/akka/http/impl/engine/client/PoolInterfaceActor.scala b/akka-http-core/src/main/scala/akka/http/impl/engine/client/PoolInterfaceActor.scala index 1b20b79fa1..73d7edd4f0 100644 --- a/akka-http-core/src/main/scala/akka/http/impl/engine/client/PoolInterfaceActor.scala +++ b/akka-http-core/src/main/scala/akka/http/impl/engine/client/PoolInterfaceActor.scala @@ -64,8 +64,8 @@ private class PoolInterfaceActor(hcps: HostConnectionPoolSetup, import hcps._ import setup._ - val connectionFlow = conContext match { - case httpsContext: HttpsConnectionContext ⇒ Http().outgoingConnectionTls(host, port, httpsContext, None, settings.connectionSettings, setup.log) + val connectionFlow = connectionContext match { + case httpsContext: HttpsConnectionContext ⇒ Http().outgoingConnectionHttps(host, port, httpsContext, None, settings.connectionSettings, setup.log) case _ ⇒ Http().outgoingConnection(host, port, None, settings.connectionSettings, setup.log) } @@ -148,7 +148,7 @@ private class PoolInterfaceActor(hcps: HostConnectionPoolSetup, } def dispatchRequest(pr: PoolRequest): Unit = { - val scheme = Uri.httpScheme(hcps.setup.conContext.isSecure) + val scheme = Uri.httpScheme(hcps.setup.connectionContext.isSecure) val hostHeader = headers.Host(hcps.host, Uri.normalizePort(hcps.port, scheme)) val effectiveRequest = pr.request diff --git a/akka-http-core/src/main/scala/akka/http/impl/engine/rendering/HttpResponseRendererFactory.scala b/akka-http-core/src/main/scala/akka/http/impl/engine/rendering/HttpResponseRendererFactory.scala index 846b950181..2cee492af4 100644 --- a/akka-http-core/src/main/scala/akka/http/impl/engine/rendering/HttpResponseRendererFactory.scala +++ b/akka-http-core/src/main/scala/akka/http/impl/engine/rendering/HttpResponseRendererFactory.scala @@ -63,10 +63,10 @@ private[http] class HttpResponseRendererFactory(serverHeader: Option[headers.Ser def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) { - private[this] var closeMode: CloseMode = DontClose // signals what to do after the current response - private[this] def close: Boolean = closeMode != DontClose - private[this] def closeIf(cond: Boolean): Unit = - if (cond) closeMode = CloseConnection + var closeMode: CloseMode = DontClose // signals what to do after the current response + def close: Boolean = closeMode != DontClose + def closeIf(cond: Boolean): Unit = if (cond) closeMode = CloseConnection + var transferring = false setHandler(in, new InHandler { override def onPush(): Unit = @@ -286,4 +286,4 @@ private[http] sealed trait ResponseRenderingOutput private[http] object ResponseRenderingOutput { private[http] case class HttpData(bytes: ByteString) extends ResponseRenderingOutput private[http] case class SwitchToWebSocket(httpResponseBytes: ByteString, handler: Either[Graph[FlowShape[FrameEvent, FrameEvent], Any], Graph[FlowShape[Message, Message], Any]]) extends ResponseRenderingOutput -} +} \ No newline at end of file diff --git a/akka-http-core/src/main/scala/akka/http/javadsl/ConnectHttp.scala b/akka-http-core/src/main/scala/akka/http/javadsl/ConnectHttp.scala new file mode 100644 index 0000000000..db2540fbd1 --- /dev/null +++ b/akka-http-core/src/main/scala/akka/http/javadsl/ConnectHttp.scala @@ -0,0 +1,107 @@ +/* + * Copyright (C) 2016 Typesafe Inc. + */ +package akka.http.javadsl + +import java.util.Locale +import java.util.Optional + +import akka.http.javadsl.model.Uri + +abstract class ConnectHttp { + def host: String + def port: Int + + def isHttps: Boolean + def connectionContext: Optional[HttpsConnectionContext] + + final def effectiveConnectionContext(fallbackContext: HttpsConnectionContext): HttpsConnectionContext = + connectionContext.orElse(fallbackContext) +} + +object ConnectHttp { + + // TODO may be optimised a bit to avoid parsing the Uri entirely for the known port cases + + /** Extracts host data from given Uri. */ + def toHost(uriHost: Uri): ConnectHttp = { + val s = uriHost.scheme.toLowerCase(Locale.ROOT) + if (s == "https") new ConnectHttpsImpl(uriHost.host.address, uriHost.port) + else new ConnectHttpImpl(uriHost.host.address, uriHost.port) + } + + def toHost(host: String): ConnectHttp = + toHost(Uri.create(host)) + + def toHost(host: String, port: Int): ConnectHttp = { + require(port > 0, "port must be > 0") + toHost(Uri.create(host).port(port)) + } + + /** + * Extracts host data from given Uri. + * Forces an HTTPS connection to the given host, using the default HTTPS context and default port. + */ + @throws(classOf[IllegalArgumentException]) + def toHostHttps(uriHost: Uri): ConnectHttp.UsingHttps = { + val s = uriHost.scheme.toLowerCase(Locale.ROOT) + require(s == "" || s == "https", "toHostHttps used with non https scheme! Was: " + uriHost) + val httpsHost = uriHost.scheme("https") // for effective port calculation + new ConnectHttpsImpl(httpsHost.host.address, effectivePort(uriHost)) + } + + /** Forces an HTTPS connection to the given host, using the default HTTPS context and default port. */ + @throws(classOf[IllegalArgumentException]) + def toHostHttps(host: String): ConnectHttp.UsingHttps = + toHostHttps(Uri.create(host)) + + /** Forces an HTTPS connection to the given host, using the default HTTPS context and given port. */ + @throws(classOf[IllegalArgumentException]) + def toHostHttps(host: String, port: Int): ConnectHttp.UsingHttps = { + require(port > 0, "port must be > 0") + toHostHttps(Uri.create(host).port(port).host.address) + } + + private def effectivePort(uri: Uri): Int = { + val s = uri.scheme.toLowerCase(Locale.ROOT) + effectivePort(s, -1) + } + + private def effectivePort(scheme: String, port: Int): Int = { + val s = scheme.toLowerCase(Locale.ROOT) + if (port > 0) port + else if (s == "https" || s == "wss") 443 + else if (s == "http" || s == "ws") 80 + else throw new IllegalArgumentException("Scheme is not http/https/ws/wss and no port given!") + } + + trait UsingHttps extends ConnectHttp { + def withCustomHttpsContext(context: HttpsConnectionContext): ConnectHttp.UsingCustomHttps + } + + trait UsingCustomHttps extends ConnectHttp { + def withDefaultContext: ConnectHttp.UsingHttps + } + +} + +/** INTERNAL API */ +final class ConnectHttpImpl(val host: String, val port: Int) extends ConnectHttp { + def isHttps: Boolean = false + + def connectionContext: Optional[HttpsConnectionContext] = Optional.empty() +} + +final class ConnectHttpsImpl(val host: String, val port: Int, val context: Optional[HttpsConnectionContext] = Optional.empty()) + extends ConnectHttp with ConnectHttp.UsingHttps with ConnectHttp.UsingCustomHttps { + + override def isHttps: Boolean = true + + override def withCustomHttpsContext(context: HttpsConnectionContext): ConnectHttp.UsingCustomHttps = + new ConnectHttpsImpl(host, port, Optional.of(context)) + + override def withDefaultContext: ConnectHttp.UsingHttps = + new ConnectHttpsImpl(host, port, Optional.empty()) + + override def connectionContext: Optional[HttpsConnectionContext] = context +} diff --git a/akka-http-core/src/main/scala/akka/http/javadsl/ConnectionContext.scala b/akka-http-core/src/main/scala/akka/http/javadsl/ConnectionContext.scala index 20556f778a..c54a6b8a19 100644 --- a/akka-http-core/src/main/scala/akka/http/javadsl/ConnectionContext.scala +++ b/akka-http-core/src/main/scala/akka/http/javadsl/ConnectionContext.scala @@ -4,11 +4,9 @@ package akka.http.javadsl import java.util.{ Collection ⇒ JCollection, Optional } -import java.{ util ⇒ ju } import javax.net.ssl.{ SSLContext, SSLParameters } - import akka.http.scaladsl -import com.typesafe.sslconfig.ssl.ClientAuth +import akka.stream.io.ClientAuth import scala.compat.java8.OptionConverters @@ -27,18 +25,18 @@ object ConnectionContext { scaladsl.ConnectionContext.noEncryption() } -trait ConnectionContext { +abstract class ConnectionContext { def isSecure: Boolean /** Java API */ def getDefaultPort: Int } -trait HttpConnectionContext extends ConnectionContext { +trait HttpConnectionContext extends akka.http.javadsl.ConnectionContext { override final def isSecure = false override final def getDefaultPort = 80 } -trait HttpsConnectionContext extends ConnectionContext { +trait HttpsConnectionContext extends akka.http.javadsl.ConnectionContext { override final def isSecure = true override final def getDefaultPort = 443 diff --git a/akka-http-core/src/main/scala/akka/http/javadsl/Http.scala b/akka-http-core/src/main/scala/akka/http/javadsl/Http.scala index e977a19e10..183777255c 100644 --- a/akka-http-core/src/main/scala/akka/http/javadsl/Http.scala +++ b/akka-http-core/src/main/scala/akka/http/javadsl/Http.scala @@ -7,8 +7,9 @@ package akka.http.javadsl import java.net.InetSocketAddress import java.util.Optional import akka.http.impl.util.JavaMapping +import akka.http.impl.util.JavaMapping.HttpsConnectionContext import akka.http.javadsl.model.ws._ -import akka.{ NotUsed, stream } +import akka.{ stream, NotUsed } import akka.stream.io.{ SslTlsInbound, SslTlsOutbound } import scala.language.implicitConversions @@ -103,10 +104,10 @@ class Http(system: ExtendedActorSystem) extends akka.actor.Extension { * [[ServerBinding]]. */ def bind(interface: String, port: Int, - settings: ServerSettings, connectionContext: ConnectionContext, + settings: ServerSettings, materializer: Materializer): Source[IncomingConnection, Future[ServerBinding]] = - new Source(delegate.bind(interface, port, settings, connectionContext = ConnectionContext.noEncryption().asScala)(materializer) + new Source(delegate.bind(interface, port, settings = settings, connectionContext = ConnectionContext.noEncryption().asScala)(materializer) .map(new IncomingConnection(_)) .mapMaterializedValue(_.map(new ServerBinding(_))(ec))) @@ -123,11 +124,30 @@ class Http(system: ExtendedActorSystem) extends akka.actor.Extension { * [[ServerBinding]]. */ def bind(interface: String, port: Int, - settings: ServerSettings, connectionContext: ConnectionContext, + materializer: Materializer): Source[IncomingConnection, Future[ServerBinding]] = + new Source(delegate.bind(interface, port, connectionContext = connectionContext.asScala)(materializer) + .map(new IncomingConnection(_)) + .mapMaterializedValue(_.map(new ServerBinding(_))(ec))) + + /** + * Creates a [[Source]] of [[IncomingConnection]] instances which represents a prospective HTTP server binding + * on the given `endpoint`. + * + * If the given port is 0 the resulting source can be materialized several times. Each materialization will + * then be assigned a new local port by the operating system, which can then be retrieved by the materialized + * [[ServerBinding]]. + * + * If the given port is non-zero subsequent materialization attempts of the produced source will immediately + * fail, unless the first materialization has already been unbound. Unbinding can be triggered via the materialized + * [[ServerBinding]]. + */ + def bind(interface: String, port: Int, + connectionContext: ConnectionContext, + settings: ServerSettings, log: LoggingAdapter, materializer: Materializer): Source[IncomingConnection, Future[ServerBinding]] = - new Source(delegate.bind(interface, port, settings, connectionContext = ConnectionContext.noEncryption().asScala, log)(materializer) + new Source(delegate.bind(interface, port, ConnectionContext.noEncryption().asScala, settings, log)(materializer) .map(new IncomingConnection(_)) .mapMaterializedValue(_.map(new ServerBinding(_))(ec))) @@ -288,61 +308,22 @@ class Http(system: ExtendedActorSystem) extends akka.actor.Extension { /** * Creates a [[Flow]] representing a prospective HTTP client connection to the given endpoint. * Every materialization of the produced flow will attempt to establish a new outgoing connection. + * + * If the hostname is given with an `https://` prefix, the default [[HttpsConnectionContext]] will be used. */ def outgoingConnection(host: String): Flow[HttpRequest, HttpResponse, Future[OutgoingConnection]] = - outgoingConnection(host, 80) + outgoingConnection(ConnectHttp.toHost(host)) /** * Creates a [[Flow]] representing a prospective HTTP client connection to the given endpoint. * Every materialization of the produced flow will attempt to establish a new outgoing connection. * - * The [[defaultClientHttpsContext]] is used to configure TLS for the connection. + * Use the [[ConnectHttp]] DSL to configure target host and whether HTTPS should be used. */ - def outgoingConnectionTls(host: String): Flow[HttpRequest, HttpResponse, Future[OutgoingConnection]] = - outgoingConnectionTls(host, defaultClientHttpsContext) - - /** - * Creates a [[Flow]] representing a prospective HTTP client connection to the given endpoint. - * Every materialization of the produced flow will attempt to establish a new outgoing connection. - */ - def outgoingConnectionTls(host: String, connectionContext: HttpsConnectionContext): Flow[HttpRequest, HttpResponse, Future[OutgoingConnection]] = - outgoingConnectionTls(host, 443, connectionContext) - - /** - * Creates a [[Flow]] representing a prospective HTTP client connection to the given endpoint. - * Every materialization of the produced flow will attempt to establish a new outgoing connection. - */ - def outgoingConnection(host: String, port: Int): Flow[HttpRequest, HttpResponse, Future[OutgoingConnection]] = - Flow.fromGraph { - akka.stream.scaladsl.Flow[HttpRequest].map(_.asScala) - .viaMat(delegate.outgoingConnection(host, port))(Keep.right) - .mapMaterializedValue(_.map(new OutgoingConnection(_))(ec)) - } - - /** - * Creates a [[Flow]] representing a prospective HTTP client connection to the given endpoint. - * Every materialization of the produced flow will attempt to establish a new outgoing connection. - * - * The [[defaultClientHttpsContext]] is used to configure TLS for the connection. - */ - def outgoingConnectionTls(host: String, port: Int): Flow[HttpRequest, HttpResponse, Future[OutgoingConnection]] = - Flow.fromGraph { - akka.stream.scaladsl.Flow[HttpRequest].map(_.asScala) - .viaMat(delegate.outgoingConnection(host, port))(Keep.right) - .mapMaterializedValue(_.map(new OutgoingConnection(_))(ec)) - } - - /** - * Creates a [[Flow]] representing a prospective HTTP client connection to the given endpoint. - * Every materialization of the produced flow will attempt to establish a new outgoing connection. - * - * The given [[HttpsConnectionContext]] will be used for encryption on the connection. - */ - def outgoingConnectionTls(host: String, port: Int, connectionContext: HttpsConnectionContext): Flow[HttpRequest, HttpResponse, Future[OutgoingConnection]] = - Flow.fromGraph { - akka.stream.scaladsl.Flow[HttpRequest].map(_.asScala) - .viaMat(delegate.outgoingConnectionTls(host, port, connectionContext.asScala))(Keep.right) - .mapMaterializedValue(_.map(new OutgoingConnection(_))(ec)) + def outgoingConnection(to: ConnectHttp): Flow[HttpRequest, HttpResponse, Future[OutgoingConnection]] = + adaptOutgoingFlow { + if (to.isHttps) delegate.outgoingConnectionHttps(to.host, to.port, to.effectiveConnectionContext(defaultClientHttpsContext).asScala) + else delegate.outgoingConnection(to.host, to.port) } /** @@ -350,46 +331,15 @@ class Http(system: ExtendedActorSystem) extends akka.actor.Extension { * Every materialization of the produced flow will attempt to establish a new outgoing connection. */ def outgoingConnection(host: String, port: Int, + connectionContext: ConnectionContext, localAddress: Optional[InetSocketAddress], settings: ClientConnectionSettings, log: LoggingAdapter): Flow[HttpRequest, HttpResponse, Future[OutgoingConnection]] = - Flow.fromGraph { - akka.stream.scaladsl.Flow[HttpRequest].map(_.asScala) - .viaMat(delegate.outgoingConnection(host, port, localAddress.asScala, settings, log))(Keep.right) - .mapMaterializedValue(_.map(new OutgoingConnection(_))(ec)) - } - - /** - * Creates a [[Flow]] representing a prospective HTTP client connection to the given endpoint. - * Every materialization of the produced flow will attempt to establish a new outgoing connection. - * - * The [[defaultClientHttpsContext]] is used to configure TLS for the connection. - */ - def outgoingConnectionTls(host: String, port: Int, - localAddress: Optional[InetSocketAddress], - settings: ClientConnectionSettings, - log: LoggingAdapter): Flow[HttpRequest, HttpResponse, Future[OutgoingConnection]] = - Flow.fromGraph { - akka.stream.scaladsl.Flow[HttpRequest].map(_.asScala) - .viaMat(delegate.outgoingConnectionTls(host, port, defaultClientHttpsContext.asScala, localAddress.asScala, settings, log))(Keep.right) - .mapMaterializedValue(_.map(new OutgoingConnection(_))(ec)) - } - - /** - * Creates a [[Flow]] representing a prospective HTTP client connection to the given endpoint. - * Every materialization of the produced flow will attempt to establish a new outgoing connection. - * - * The given [[HttpsConnectionContext]] will be used for encryption on the connection. - */ - def outgoingConnectionTls(host: String, port: Int, - connectionContext: HttpsConnectionContext, - localAddress: Optional[InetSocketAddress], - settings: ClientConnectionSettings, - log: LoggingAdapter): Flow[HttpRequest, HttpResponse, Future[OutgoingConnection]] = - Flow.fromGraph { - akka.stream.scaladsl.Flow[HttpRequest].map(_.asScala) - .viaMat(delegate.outgoingConnectionTls(host, port, connectionContext.asScala, localAddress.asScala, settings, log))(Keep.right) - .mapMaterializedValue(_.map(new OutgoingConnection(_))(ec)) + adaptOutgoingFlow { + connectionContext match { + case https: HttpsConnectionContext ⇒ delegate.outgoingConnectionHttps(host, port, https.asScala, localAddress.asScala, settings, log) + case _ ⇒ delegate.outgoingConnection(host, port, localAddress.asScala, settings, log) + } } /** @@ -406,28 +356,8 @@ class Http(system: ExtendedActorSystem) extends akka.actor.Extension { * In order to allow for easy response-to-request association the flow takes in a custom, opaque context * object of type `T` from the application which is emitted together with the corresponding response. */ - def newHostConnectionPool[T](host: String, port: Int, - materializer: Materializer): Flow[Pair[HttpRequest, T], Pair[Try[HttpResponse], T], HostConnectionPool] = - adaptTupleFlow(delegate.newHostConnectionPool[T](host, port)(materializer)) - - /** - * Same as [[newHostConnectionPool]] but with HTTPS encryption. - * - * The [[defaultClientHttpsContext]] is used to configure TLS for the connection. - */ - def newHostConnectionPoolTls[T](host: String, port: Int, - materializer: Materializer): Flow[Pair[HttpRequest, T], Pair[Try[HttpResponse], T], HostConnectionPool] = - adaptTupleFlow(delegate.newHostConnectionPoolTls[T](host, port)(materializer)) - - /** - * Same as [[newHostConnectionPool]] but with HTTPS encryption. - * - * The given [[HttpsConnectionContext]] will be used for encryption on the connection. - */ - def newHostConnectionPoolTls[T](host: String, port: Int, - connectionContext: HttpsConnectionContext, - materializer: Materializer): Flow[Pair[HttpRequest, T], Pair[Try[HttpResponse], T], HostConnectionPool] = - adaptTupleFlow(delegate.newHostConnectionPoolTls[T](host, port, connectionContext.asScala)(materializer)) + def newHostConnectionPool[T](host: String, materializer: Materializer): Flow[Pair[HttpRequest, T], Pair[Try[HttpResponse], T], HostConnectionPool] = + newHostConnectionPool[T](ConnectHttp.toHost(host), materializer) /** * Starts a new connection pool to the given host and configuration and returns a [[Flow]] which dispatches @@ -443,31 +373,23 @@ class Http(system: ExtendedActorSystem) extends akka.actor.Extension { * In order to allow for easy response-to-request association the flow takes in a custom, opaque context * object of type `T` from the application which is emitted together with the corresponding response. */ - def newHostConnectionPool[T](host: String, port: Int, + def newHostConnectionPool[T](to: ConnectHttp, materializer: Materializer): Flow[Pair[HttpRequest, T], Pair[Try[HttpResponse], T], HostConnectionPool] = + adaptTupleFlow(delegate.newHostConnectionPool[T](to.host, to.port)(materializer)) + + /** + * Same as [[newHostConnectionPool]] but with HTTPS encryption. + * + * The given [[ConnectionContext]] will be used for encryption on the connection. + */ + def newHostConnectionPool[T](to: ConnectHttp, settings: ConnectionPoolSettings, log: LoggingAdapter, materializer: Materializer): Flow[Pair[HttpRequest, T], Pair[Try[HttpResponse], T], HostConnectionPool] = - adaptTupleFlow(delegate.newHostConnectionPool[T](host, port, settings, log)(materializer)) - - /** - * Same as [[newHostConnectionPool]] but with HTTPS encryption. - * - * The [[defaultClientHttpsContext]] is used to configure TLS for the connection. - */ - def newHostConnectionPoolTls[T](host: String, port: Int, - settings: ConnectionPoolSettings, - log: LoggingAdapter, materializer: Materializer): Flow[Pair[HttpRequest, T], Pair[Try[HttpResponse], T], HostConnectionPool] = - adaptTupleFlow(delegate.newHostConnectionPoolTls[T](host, port, defaultClientHttpsContext.asScala, settings, log)(materializer)) - - /** - * Same as [[newHostConnectionPool]] but with HTTPS encryption. - * - * The given [[HttpsConnectionContext]] will be used for encryption on the connection. - */ - def newHostConnectionPoolTls[T](host: String, port: Int, - connectionContext: HttpsConnectionContext, - settings: ConnectionPoolSettings, - log: LoggingAdapter, materializer: Materializer): Flow[Pair[HttpRequest, T], Pair[Try[HttpResponse], T], HostConnectionPool] = - adaptTupleFlow(delegate.newHostConnectionPoolTls[T](host, port, connectionContext.asScala, settings, log)(materializer)) + adaptTupleFlow { + to.effectiveConnectionContext(defaultClientHttpsContext) match { + case https: HttpsConnectionContext ⇒ delegate.newHostConnectionPoolHttps[T](to.host, to.port, https.asScala, settings, log)(materializer) + case _ ⇒ delegate.newHostConnectionPool[T](to.host, to.port, settings, log)(materializer) + } + } /** * Returns a [[Flow]] which dispatches incoming HTTP requests to the per-ActorSystem pool of outgoing @@ -486,16 +408,8 @@ class Http(system: ExtendedActorSystem) extends akka.actor.Extension { * In order to allow for easy response-to-request association the flow takes in a custom, opaque context * object of type `T` from the application which is emitted together with the corresponding response. */ - def cachedHostConnectionPool[T](host: String, port: Int, materializer: Materializer): Flow[Pair[HttpRequest, T], Pair[Try[HttpResponse], T], HostConnectionPool] = - adaptTupleFlow(delegate.cachedHostConnectionPool[T](host, port)(materializer)) - - /** - * Same as [[cachedHostConnectionPool]] but with HTTPS encryption. - * - * The [[defaultClientHttpsContext]] is used to configure TLS for the connection. - */ - def cachedHostConnectionPoolTls[T](host: String, port: Int, materializer: Materializer): Flow[Pair[HttpRequest, T], Pair[Try[HttpResponse], T], HostConnectionPool] = - adaptTupleFlow(delegate.cachedHostConnectionPoolTls[T](host, port, defaultClientHttpsContext.asScala)(materializer)) + def cachedHostConnectionPool[T](host: String, materializer: Materializer): Flow[Pair[HttpRequest, T], Pair[Try[HttpResponse], T], HostConnectionPool] = + cachedHostConnectionPool(ConnectHttp.toHost(host), materializer) /** * Returns a [[Flow]] which dispatches incoming HTTP requests to the per-ActorSystem pool of outgoing @@ -514,21 +428,18 @@ class Http(system: ExtendedActorSystem) extends akka.actor.Extension { * In order to allow for easy response-to-request association the flow takes in a custom, opaque context * object of type `T` from the application which is emitted together with the corresponding response. */ - def cachedHostConnectionPool[T](host: String, port: Int, - settings: ConnectionPoolSettings, - log: LoggingAdapter, materializer: Materializer): Flow[Pair[HttpRequest, T], Pair[Try[HttpResponse], T], HostConnectionPool] = - adaptTupleFlow(delegate.cachedHostConnectionPool[T](host, port, settings, log)(materializer)) + def cachedHostConnectionPool[T](to: ConnectHttp, materializer: Materializer): Flow[Pair[HttpRequest, T], Pair[Try[HttpResponse], T], HostConnectionPool] = + adaptTupleFlow(delegate.cachedHostConnectionPool[T](to.host, to.port)(materializer)) /** * Same as [[cachedHostConnectionPool]] but with HTTPS encryption. * - * The given [[HttpsConnectionContext]] will be used for encryption on the connection. + * The given [[ConnectionContext]] will be used for encryption on the connection. */ - def cachedHostConnectionPoolTls[T](host: String, port: Int, - settings: ConnectionPoolSettings, - connectionContext: HttpsConnectionContext, - log: LoggingAdapter, materializer: Materializer): Flow[Pair[HttpRequest, T], Pair[Try[HttpResponse], T], HostConnectionPool] = - adaptTupleFlow(delegate.cachedHostConnectionPoolTls[T](host, port, connectionContext.asScala, settings, log)(materializer)) + def cachedHostConnectionPool[T](to: ConnectHttp, + settings: ConnectionPoolSettings, + log: LoggingAdapter, materializer: Materializer): Flow[Pair[HttpRequest, T], Pair[Try[HttpResponse], T], HostConnectionPool] = + adaptTupleFlow(delegate.cachedHostConnectionPoolHttps[T](to.host, to.port, to.effectiveConnectionContext(defaultClientHttpsContext).asScala, settings, log)(materializer)) /** * Creates a new "super connection pool flow", which routes incoming requests to a (cached) host connection pool @@ -596,6 +507,18 @@ class Http(system: ExtendedActorSystem) extends akka.actor.Extension { def singleRequest(request: HttpRequest, materializer: Materializer): Future[HttpResponse] = delegate.singleRequest(request.asScala)(materializer) + /** + * Fires a single [[HttpRequest]] across the (cached) host connection pool for the request's + * effective URI to produce a response future. + * + * The [[defaultClientHttpsContext]] is used to configure TLS for the connection. + * + * Note that the request must have either an absolute URI or a valid `Host` header, otherwise + * the future will be completed with an error. + */ + def singleRequest(request: HttpRequest, connectionContext: HttpsConnectionContext, materializer: Materializer): Future[HttpResponse] = + delegate.singleRequest(request.asScala, connectionContext.asScala)(materializer) + /** * Fires a single [[HttpRequest]] across the (cached) host connection pool for the request's * effective URI to produce a response future. @@ -656,7 +579,7 @@ class Http(system: ExtendedActorSystem) extends akka.actor.Extension { * The layer is not reusable and must only be materialized once. */ def webSocketClientFlow(request: WebSocketRequest, - connectionContext: HttpsConnectionContext, + connectionContext: ConnectionContext, localAddress: Optional[InetSocketAddress], settings: ClientConnectionSettings, log: LoggingAdapter): Flow[Message, Message, Future[WebSocketUpgradeResponse]] = @@ -687,7 +610,7 @@ class Http(system: ExtendedActorSystem) extends akka.actor.Extension { */ def singleWebSocketRequest[T](request: WebSocketRequest, clientFlow: Flow[Message, Message, T], - connectionContext: HttpsConnectionContext, + connectionContext: ConnectionContext, materializer: Materializer): Pair[Future[WebSocketUpgradeResponse], T] = adaptWsResultTuple { delegate.singleWebSocketRequest( @@ -702,7 +625,7 @@ class Http(system: ExtendedActorSystem) extends akka.actor.Extension { */ def singleWebSocketRequest[T](request: WebSocketRequest, clientFlow: Flow[Message, Message, T], - connectionContext: HttpsConnectionContext, + connectionContext: ConnectionContext, localAddress: Optional[InetSocketAddress], settings: ClientConnectionSettings, log: LoggingAdapter, @@ -728,12 +651,19 @@ class Http(system: ExtendedActorSystem) extends akka.actor.Extension { def shutdownAllConnectionPools(): Future[Unit] = delegate.shutdownAllConnectionPools() /** - * Gets the current default client-side [[HttpsConnectionContext]]. + * Gets the default + * @return */ - def defaultClientHttpsContext: HttpsConnectionContext = delegate.defaultClientHttpsContext + def defaultServerHttpContext: ConnectionContext = + delegate.defaultServerHttpContext /** - * Sets the default client-side [[HttpsConnectionContext]]. + * Gets the current default client-side [[ConnectionContext]]. + */ + def defaultClientHttpsContext: akka.http.javadsl.HttpsConnectionContext = delegate.defaultClientHttpsContext + + /** + * Sets the default client-side [[ConnectionContext]]. */ def setDefaultClientHttpsContext(context: HttpsConnectionContext): Unit = delegate.setDefaultClientHttpsContext(context.asInstanceOf[akka.http.scaladsl.HttpsConnectionContext]) @@ -743,6 +673,13 @@ class Http(system: ExtendedActorSystem) extends akka.actor.Extension { JavaMapping.toJava(scalaFlow)(JavaMapping.flowMapping[Pair[HttpRequest, T], (scaladsl.model.HttpRequest, T), Pair[Try[HttpResponse], T], (Try[scaladsl.model.HttpResponse], T), Mat]) } + private def adaptOutgoingFlow[T, Mat](scalaFlow: stream.scaladsl.Flow[scaladsl.model.HttpRequest, scaladsl.model.HttpResponse, Future[scaladsl.Http.OutgoingConnection]]): Flow[HttpRequest, HttpResponse, Future[OutgoingConnection]] = + Flow.fromGraph { + akka.stream.scaladsl.Flow[HttpRequest].map(_.asScala) + .viaMat(scalaFlow)(Keep.right) + .mapMaterializedValue(_.map(new OutgoingConnection(_))(ec)) + } + private def adaptServerLayer(serverLayer: scaladsl.Http.ServerLayer): BidiFlow[HttpResponse, SslTlsOutbound, SslTlsInbound, HttpRequest, NotUsed] = new BidiFlow( JavaMapping.adapterBidiFlow[HttpResponse, sm.HttpResponse, sm.HttpRequest, HttpRequest] diff --git a/akka-http-core/src/main/scala/akka/http/scaladsl/ConnectionContext.scala b/akka-http-core/src/main/scala/akka/http/scaladsl/ConnectionContext.scala index e17508d683..d8d1e14ff6 100644 --- a/akka-http-core/src/main/scala/akka/http/scaladsl/ConnectionContext.scala +++ b/akka-http-core/src/main/scala/akka/http/scaladsl/ConnectionContext.scala @@ -4,14 +4,12 @@ package akka.http.scaladsl -import akka.stream.io.NegotiateNewSession +import akka.stream.io.{ ClientAuth, NegotiateNewSession } import scala.collection.JavaConverters._ import java.util.{ Optional, Collection ⇒ JCollection } import javax.net.ssl._ -import com.typesafe.sslconfig.ssl.ClientAuth - import scala.collection.immutable import scala.compat.java8.OptionConverters._ diff --git a/akka-http-core/src/main/scala/akka/http/scaladsl/Http.scala b/akka-http-core/src/main/scala/akka/http/scaladsl/Http.scala index 7b544682cf..15be634b37 100644 --- a/akka-http-core/src/main/scala/akka/http/scaladsl/Http.scala +++ b/akka-http-core/src/main/scala/akka/http/scaladsl/Http.scala @@ -27,7 +27,7 @@ import akka.stream.scaladsl._ import com.typesafe.config.Config import com.typesafe.sslconfig.akka._ import com.typesafe.sslconfig.akka.util.AkkaLoggerFactory -import com.typesafe.sslconfig.ssl._ +import com.typesafe.sslconfig.ssl.ConfigSSLContextBuilder import scala.concurrent.{ ExecutionContext, Future, Promise, TimeoutException } import scala.util.Try @@ -42,7 +42,8 @@ class HttpExt(private val config: Config)(implicit val system: ActorSystem) exte // configured default HttpsContext for the client-side // SYNCHRONIZED ACCESS ONLY! - private[this] var _defaultClientHttpsContext: HttpsConnectionContext = _ + private[this] var _defaultClientHttpsConnectionContext: HttpsConnectionContext = _ + private[this] var _defaultServerConnectionContext: ConnectionContext = _ // ** SERVER ** // @@ -70,8 +71,8 @@ class HttpExt(private val config: Config)(implicit val system: ActorSystem) exte * use the `akka.http.server` config section or pass in a [[ServerSettings]] explicitly. */ def bind(interface: String, port: Int = DefaultPortForProtocol, - settings: ServerSettings = ServerSettings(system), connectionContext: ConnectionContext = defaultServerHttpContext, + settings: ServerSettings = ServerSettings(system), log: LoggingAdapter = system.log)(implicit fm: Materializer): Source[IncomingConnection, Future[ServerBinding]] = { val effectivePort = if (port >= 0) port else connectionContext.defaultPort val tlsStage = sslTlsStage(connectionContext, Server) @@ -114,7 +115,7 @@ class HttpExt(private val config: Config)(implicit val system: ActorSystem) exte throw e } - bind(interface, port, settings, connectionContext, log) + bind(interface, port, connectionContext, settings, log) .mapAsyncUnordered(settings.maxConnections) { connection ⇒ handleOneConnection(connection).recoverWith { // Ignore incoming errors from the connection as they will cancel the binding. @@ -207,11 +208,11 @@ class HttpExt(private val config: Config)(implicit val system: ActorSystem) exte * To configure additional settings for requests made using this method, * use the `akka.http.client` config section or pass in a [[ClientConnectionSettings]] explicitly. */ - def outgoingConnectionTls(host: String, port: Int = 443, - connectionContext: HttpsConnectionContext = defaultClientHttpsContext, - localAddress: Option[InetSocketAddress] = None, - settings: ClientConnectionSettings = ClientConnectionSettings(system), - log: LoggingAdapter = system.log): Flow[HttpRequest, HttpResponse, Future[OutgoingConnection]] = + def outgoingConnectionHttps(host: String, port: Int = 443, + connectionContext: HttpsConnectionContext = defaultClientHttpsContext, + localAddress: Option[InetSocketAddress] = None, + settings: ClientConnectionSettings = ClientConnectionSettings(system), + log: LoggingAdapter = system.log): Flow[HttpRequest, HttpResponse, Future[OutgoingConnection]] = _outgoingConnection(host, port, localAddress, settings, connectionContext, log) private def _outgoingConnection(host: String, port: Int, localAddress: Option[InetSocketAddress], @@ -287,10 +288,10 @@ class HttpExt(private val config: Config)(implicit val system: ActorSystem) exte * To configure additional settings for the pool (and requests made using it), * use the `akka.http.host-connection-pool` config section or pass in a [[ConnectionPoolSettings]] explicitly. */ - def newHostConnectionPoolTls[T](host: String, port: Int = 443, - connectionContext: HttpsConnectionContext = defaultClientHttpsContext, - settings: ConnectionPoolSettings = ConnectionPoolSettings(system), - log: LoggingAdapter = system.log)(implicit fm: Materializer): Flow[(HttpRequest, T), (Try[HttpResponse], T), HostConnectionPool] = { + def newHostConnectionPoolHttps[T](host: String, port: Int = 443, + connectionContext: HttpsConnectionContext = defaultClientHttpsContext, + settings: ConnectionPoolSettings = ConnectionPoolSettings(system), + log: LoggingAdapter = system.log)(implicit fm: Materializer): Flow[(HttpRequest, T), (Try[HttpResponse], T), HostConnectionPool] = { val cps = ConnectionPoolSetup(settings, connectionContext, log) newHostConnectionPool(HostConnectionPoolSetup(host, port, cps)) } @@ -352,10 +353,10 @@ class HttpExt(private val config: Config)(implicit val system: ActorSystem) exte * To configure additional settings for the pool (and requests made using it), * use the `akka.http.host-connection-pool` config section or pass in a [[ConnectionPoolSettings]] explicitly. */ - def cachedHostConnectionPoolTls[T](host: String, port: Int = 443, - connectionContext: HttpsConnectionContext = defaultClientHttpsContext, - settings: ConnectionPoolSettings = ConnectionPoolSettings(system), - log: LoggingAdapter = system.log)(implicit fm: Materializer): Flow[(HttpRequest, T), (Try[HttpResponse], T), HostConnectionPool] = { + def cachedHostConnectionPoolHttps[T](host: String, port: Int = 443, + connectionContext: HttpsConnectionContext = defaultClientHttpsContext, + settings: ConnectionPoolSettings = ConnectionPoolSettings(system), + log: LoggingAdapter = system.log)(implicit fm: Materializer): Flow[(HttpRequest, T), (Try[HttpResponse], T), HostConnectionPool] = { val cps = ConnectionPoolSetup(settings, connectionContext, log) val setup = HostConnectionPoolSetup(host, port, cps) cachedHostConnectionPool(setup) @@ -378,7 +379,7 @@ class HttpExt(private val config: Config)(implicit val system: ActorSystem) exte * In order to allow for easy response-to-request association the flow takes in a custom, opaque context * object of type `T` from the application which is emitted together with the corresponding response. */ - def cachedHostConnectionPool[T](setup: HostConnectionPoolSetup)( + private def cachedHostConnectionPool[T](setup: HostConnectionPoolSetup)( implicit fm: Materializer): Flow[(HttpRequest, T), (Try[HttpResponse], T), HostConnectionPool] = gatewayClientFlow(setup, cachedGateway(setup)) @@ -449,8 +450,9 @@ class HttpExt(private val config: Config)(implicit val system: ActorSystem) exte require(uri.isAbsolute, s"WebSocket request URI must be absolute but was '$uri'") val ctx = uri.scheme match { - case "ws" ⇒ ConnectionContext.noEncryption() - case "wss" ⇒ connectionContext + case "ws" ⇒ ConnectionContext.noEncryption() + case "wss" if connectionContext.isSecure ⇒ connectionContext + case "wss" ⇒ throw new IllegalArgumentException("Provided connectionContext is not secure, yet request to secure `wss` endpoint detected!") case scheme ⇒ throw new IllegalArgumentException(s"Illegal URI scheme '$scheme' in '$uri' for WebSocket request. " + s"WebSocket requests must use either 'ws' or 'wss'") @@ -468,7 +470,7 @@ class HttpExt(private val config: Config)(implicit val system: ActorSystem) exte */ def singleWebSocketRequest[T](request: WebSocketRequest, clientFlow: Flow[Message, Message, T], - connectionContext: HttpsConnectionContext = defaultClientHttpsContext, + connectionContext: ConnectionContext = defaultClientHttpsContext, localAddress: Option[InetSocketAddress] = None, settings: ClientConnectionSettings = ClientConnectionSettings(system), log: LoggingAdapter = system.log)(implicit mat: Materializer): (Future[WebSocketUpgradeResponse], T) = @@ -496,7 +498,20 @@ class HttpExt(private val config: Config)(implicit val system: ActorSystem) exte * Gets the current default server-side [[ConnectionContext]] – defaults to plain HTTP. */ def defaultServerHttpContext: ConnectionContext = - ConnectionContext.noEncryption() + synchronized { + if (_defaultServerConnectionContext == null) + _defaultServerConnectionContext = ConnectionContext.noEncryption() + _defaultServerConnectionContext + } + + /** + * Sets the default server-side [[ConnectionContext]]. + * If it is an instance of [[HttpsConnectionContext]] then the server will be bound using HTTPS. + */ + def setDefaultClientHttpsContext(context: ConnectionContext): Unit = + synchronized { + _defaultServerConnectionContext = context + } /** * Gets the current default client-side [[HttpsConnectionContext]]. @@ -504,10 +519,10 @@ class HttpExt(private val config: Config)(implicit val system: ActorSystem) exte */ def defaultClientHttpsContext: HttpsConnectionContext = synchronized { - _defaultClientHttpsContext match { + _defaultClientHttpsConnectionContext match { case null ⇒ val ctx = createDefaultClientHttpsContext() - _defaultClientHttpsContext = ctx + _defaultClientHttpsConnectionContext = ctx ctx case ctx ⇒ ctx } @@ -518,7 +533,7 @@ class HttpExt(private val config: Config)(implicit val system: ActorSystem) exte */ def setDefaultClientHttpsContext(context: HttpsConnectionContext): Unit = synchronized { - _defaultClientHttpsContext = context + _defaultClientHttpsConnectionContext = context } // every ActorSystem maintains its own connection pools @@ -754,15 +769,17 @@ trait DefaultSSLContextCreation { defaultParams.setCipherSuites(cipherSuites) // auth! + import com.typesafe.sslconfig.ssl.{ ClientAuth ⇒ SslClientAuth } val clientAuth = config.sslParametersConfig.clientAuth match { - case ClientAuth.Default ⇒ None - case auth ⇒ Some(auth) + case SslClientAuth.Default ⇒ None + case SslClientAuth.Want ⇒ Some(ClientAuth.Want) + case SslClientAuth.Need ⇒ Some(ClientAuth.Need) + case SslClientAuth.None ⇒ Some(ClientAuth.None) } // hostname! defaultParams.setEndpointIdentificationAlgorithm("https") - // new HttpsConnectionContext(sslContext, Some(cipherSuites.toList), Some(defaultProtocols.toList), clientAuth, Some(defaultParams)) - new HttpsConnectionContext(sslContext, None, None, None, Some(defaultParams)) // previously + new HttpsConnectionContext(sslContext, Some(cipherSuites.toList), Some(defaultProtocols.toList), clientAuth, Some(defaultParams)) } } diff --git a/akka-http-core/src/main/scala/akka/http/scaladsl/model/Uri.scala b/akka-http-core/src/main/scala/akka/http/scaladsl/model/Uri.scala index 9ee95ea011..ca2ee39c84 100644 --- a/akka-http-core/src/main/scala/akka/http/scaladsl/model/Uri.scala +++ b/akka-http-core/src/main/scala/akka/http/scaladsl/model/Uri.scala @@ -43,11 +43,13 @@ sealed abstract case class Uri(scheme: String, authority: Authority, path: Path, def queryString(charset: Charset = UTF8): Option[String] = rawQueryString.map(s ⇒ decode(s, charset)) /** + * INTERNAL API + * * The effective port of this Uri given the currently set authority and scheme values. * If the authority has an explicitly set port (i.e. a non-zero port value) then this port * is the effective port. Otherwise the default port for the current scheme is returned. */ - def effectivePort: Int = if (authority.port != 0) authority.port else defaultPorts(scheme) + private[akka] def effectivePort: Int = if (authority.port != 0) authority.port else defaultPorts(scheme) /** * Returns a copy of this Uri with the given components. @@ -592,7 +594,7 @@ object Uri { } } - val defaultPorts: Map[String, Int] = + private val defaultPorts: Map[String, Int] = Map("ftp" -> 21, "ssh" -> 22, "telnet" -> 23, "smtp" -> 25, "domain" -> 53, "tftp" -> 69, "http" -> 80, "ws" -> 80, "pop3" -> 110, "nntp" -> 119, "imap" -> 143, "snmp" -> 161, "ldap" -> 389, "https" -> 443, "wss" -> 443, "imaps" -> 993, "nfs" -> 2049).withDefaultValue(-1) diff --git a/akka-http-core/src/test/scala/akka/http/impl/engine/client/ClientCancellationSpec.scala b/akka-http-core/src/test/scala/akka/http/impl/engine/client/ClientCancellationSpec.scala index 3ddb661074..e003ede18c 100644 --- a/akka-http-core/src/test/scala/akka/http/impl/engine/client/ClientCancellationSpec.scala +++ b/akka-http-core/src/test/scala/akka/http/impl/engine/client/ClientCancellationSpec.scala @@ -57,7 +57,7 @@ class ClientCancellationSpec extends AkkaSpec(""" "support cancellation in simple outgoing connection with TLS" in { pending testCase( - Http().outgoingConnectionTls(addressTls.getHostName, addressTls.getPort)) + Http().outgoingConnectionHttps(addressTls.getHostName, addressTls.getPort)) } "support cancellation in pooled outgoing connection with TLS" in { @@ -65,7 +65,7 @@ class ClientCancellationSpec extends AkkaSpec(""" testCase( Flow[HttpRequest] .map((_, ())) - .via(Http().cachedHostConnectionPoolTls(addressTls.getHostName, addressTls.getPort)(noncheckedMaterializer)) + .via(Http().cachedHostConnectionPoolHttps(addressTls.getHostName, addressTls.getPort)(noncheckedMaterializer)) .map(_._1.get)) } diff --git a/akka-http-core/src/test/scala/akka/http/scaladsl/TestClient.scala b/akka-http-core/src/test/scala/akka/http/scaladsl/TestClient.scala index 58baae1ff8..44bbe1fdda 100644 --- a/akka-http-core/src/test/scala/akka/http/scaladsl/TestClient.scala +++ b/akka-http-core/src/test/scala/akka/http/scaladsl/TestClient.scala @@ -33,7 +33,7 @@ object TestClient extends App { def fetchServerVersion1(): Unit = { println(s"Fetching HTTPS server version of host `$host` via a direct low-level connection ...") - val connection = Http().outgoingConnectionTls(host) + val connection = Http().outgoingConnectionHttps(host) val result = Source.single(HttpRequest()).via(connection).runWith(Sink.head) result.map(_.header[headers.Server]) onComplete { case Success(res) ⇒ diff --git a/akka-http-tests/src/main/java/akka/http/javadsl/server/examples/petstore/PetStoreExample.java b/akka-http-tests/src/main/java/akka/http/javadsl/server/examples/petstore/PetStoreExample.java index 66ac808762..939ea153a7 100644 --- a/akka-http-tests/src/main/java/akka/http/javadsl/server/examples/petstore/PetStoreExample.java +++ b/akka-http-tests/src/main/java/akka/http/javadsl/server/examples/petstore/PetStoreExample.java @@ -17,55 +17,53 @@ import java.util.concurrent.ConcurrentHashMap; import static akka.http.javadsl.server.Directives.*; public class PetStoreExample { - static PathMatcher petId = PathMatchers.intValue(); - static RequestVal petEntity = RequestVals.entityAs(Jackson.jsonAs(Pet.class)); + static PathMatcher petId = PathMatchers.intValue(); + static RequestVal petEntity = RequestVals.entityAs(Jackson.jsonAs(Pet.class)); - public static Route appRoute(final Map pets) { - PetStoreController controller = new PetStoreController(pets); + public static Route appRoute(final Map pets) { + PetStoreController controller = new PetStoreController(pets); - final RequestVal existingPet = RequestVals.lookupInMap(petId, Pet.class, pets); + final RequestVal existingPet = RequestVals.lookupInMap(petId, Pet.class, pets); - Handler1 putPetHandler = new Handler1() { - public RouteResult apply(RequestContext ctx, Pet thePet) { - pets.put(thePet.getId(), thePet); - return ctx.completeAs(Jackson.json(), thePet); - } - }; + Handler1 putPetHandler = (Handler1) (ctx, thePet) -> { + pets.put(thePet.getId(), thePet); + return ctx.completeAs(Jackson.json(), thePet); + }; - return - route( - path().route( - getFromResource("web/index.html") - ), - path("pet", petId).route( - // demonstrates three different ways of handling requests: + return + route( + path().route( + getFromResource("web/index.html") + ), + path("pet", petId).route( + // demonstrates three different ways of handling requests: - // 1. using a predefined route that completes with an extraction - get(extractAndComplete(Jackson.json(), existingPet)), + // 1. using a predefined route that completes with an extraction + get(extractAndComplete(Jackson.json(), existingPet)), - // 2. using a handler - put(handleWith1(petEntity, putPetHandler)), + // 2. using a handler + put(handleWith1(petEntity, putPetHandler)), - // 3. calling a method of a controller instance reflectively - delete(handleReflectively(controller, "deletePet", petId)) - ) - ); - } - - public static void main(String[] args) throws IOException { - Map pets = new ConcurrentHashMap(); - Pet dog = new Pet(0, "dog"); - Pet cat = new Pet(1, "cat"); - pets.put(0, dog); - pets.put(1, cat); - - ActorSystem system = ActorSystem.create(); - try { - HttpService.bindRoute("localhost", 8080, appRoute(pets), system); - System.out.println("Type RETURN to exit"); - System.in.read(); - } finally { - system.terminate(); - } + // 3. calling a method of a controller instance reflectively + delete(handleReflectively(controller, "deletePet", petId)) + ) + ); + } + + public static void main(String[] args) throws IOException { + Map pets = new ConcurrentHashMap<>(); + Pet dog = new Pet(0, "dog"); + Pet cat = new Pet(1, "cat"); + pets.put(0, dog); + pets.put(1, cat); + + ActorSystem system = ActorSystem.create(); + try { + HttpService.bindRoute("localhost", 8080, appRoute(pets), system); + System.out.println("Type RETURN to exit"); + System.in.read(); + } finally { + system.terminate(); } + } } \ No newline at end of file diff --git a/akka-http-tests/src/test/java/akka/http/javadsl/client/HttpAPIsTest.java b/akka-http-tests/src/test/java/akka/http/javadsl/client/HttpAPIsTest.java new file mode 100644 index 0000000000..e85bfbbc1b --- /dev/null +++ b/akka-http-tests/src/test/java/akka/http/javadsl/client/HttpAPIsTest.java @@ -0,0 +1,91 @@ +/* + * Copyright (C) 2009-2014 Typesafe Inc. + */ + +package akka.http.javadsl.client; + +import akka.event.LoggingAdapter; +import akka.http.ConnectionPoolSettings; +import akka.http.javadsl.ConnectHttp; +import akka.http.javadsl.ConnectionContext; +import akka.http.javadsl.Http; +import akka.http.javadsl.HttpsConnectionContext; +import akka.http.javadsl.testkit.JUnitRouteTest; + +import javax.net.ssl.SSLContext; + +import static akka.http.javadsl.ConnectHttp.*; +import static akka.http.javadsl.ConnectHttp.toHostHttps; + +@SuppressWarnings("ConstantConditions") +public class HttpAPIsTest extends JUnitRouteTest { + + public void compileOnly() throws Exception { + final Http http = Http.get(system()); + + final HttpsConnectionContext httpsContext = ConnectionContext.https(SSLContext.getDefault()); + + String host = ""; + int port = 9090; + ConnectionPoolSettings conSettings = null; + LoggingAdapter log = null; + + http.bind("127.0.0.1", 8080, materializer()); + http.bind("127.0.0.1", 8080, httpsContext, materializer()); + + http.bindAndHandle(null, "127.0.0.1", 8080, materializer()); + http.bindAndHandle(null, "127.0.0.1", 8080, httpsContext, materializer()); + + http.bindAndHandleAsync(null, "127.0.0.1", 8080, materializer()); + http.bindAndHandleAsync(null, "127.0.0.1", 8080, httpsContext, materializer()); + + http.bindAndHandleSync(null, "127.0.0.1", 8080, materializer()); + http.bindAndHandleSync(null, "127.0.0.1", 8080, httpsContext, materializer()); + + http.singleRequest(null, materializer()); + http.singleRequest(null, httpsContext, materializer()); + http.singleRequest(null, httpsContext, conSettings, log, materializer()); + + http.outgoingConnection("akka.io"); + http.outgoingConnection("akka.io:8080"); + http.outgoingConnection("https://akka.io"); + http.outgoingConnection("https://akka.io:8081"); + + http.outgoingConnection(toHost("akka.io")); + http.outgoingConnection(toHost("akka.io", 8080)); + http.outgoingConnection(toHost("https://akka.io")); + http.outgoingConnection(toHostHttps("akka.io")); // default ssl context (ssl-config) + http.outgoingConnection(toHostHttps("ssh://akka.io")); // throws, we explicitly require https or "" + http.outgoingConnection(toHostHttps("akka.io", 8081).withCustomHttpsContext(httpsContext)); + http.outgoingConnection(toHostHttps("akka.io", 8081).withCustomHttpsContext(httpsContext).withDefaultContext()); + http.outgoingConnection(toHostHttps("akka.io", 8081).withCustomHttpsContext(httpsContext).withDefaultContext()); + + // in future we can add modify(context -> Context) to "keep ssl-config defaults, but tweak them in code) + + http.newHostConnectionPool("akka.io", materializer()); + http.newHostConnectionPool("https://akka.io", materializer()); + http.newHostConnectionPool("https://akka.io:8080", materializer()); + http.newHostConnectionPool(toHost("akka.io"), materializer()); + http.newHostConnectionPool(toHostHttps("ftp://akka.io"), materializer()); // throws, we explicitly require https or "" + http.newHostConnectionPool(toHostHttps("https://akka.io:2222"), materializer()); + http.newHostConnectionPool(toHostHttps("akka.io"), materializer()); + http.newHostConnectionPool(toHost(""), conSettings, log, materializer()); + + + http.cachedHostConnectionPool("akka.io", materializer()); + http.cachedHostConnectionPool("https://akka.io", materializer()); + http.cachedHostConnectionPool("https://akka.io:8080", materializer()); + http.cachedHostConnectionPool(toHost("akka.io"), materializer()); + http.cachedHostConnectionPool(toHostHttps("smtp://akka.io"), materializer()); // throws, we explicitly require https or "" + http.cachedHostConnectionPool(toHostHttps("https://akka.io:2222"), materializer()); + http.cachedHostConnectionPool(toHostHttps("akka.io"), materializer()); + http.cachedHostConnectionPool(toHost("akka.io"), conSettings, log, materializer()); + + http.superPool(materializer()); + http.superPool(conSettings, log, materializer()); + http.superPool(conSettings, httpsContext, log, materializer()); + + ConnectHttp.UsingHttps connect = toHostHttps("akka.io", 8081).withCustomHttpsContext(httpsContext).withDefaultContext(); + connect.connectionContext().orElse(http.defaultClientHttpsContext()); // usage by us internally + } +} \ No newline at end of file diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/SslTlsCipherActor.scala b/akka-stream/src/main/scala/akka/stream/impl/io/SslTlsCipherActor.scala index 113226546f..9e4c8d6be5 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/io/SslTlsCipherActor.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/SslTlsCipherActor.scala @@ -15,7 +15,6 @@ import akka.stream.impl.FanOut.OutputBunch import akka.stream.impl._ import akka.util.ByteString import com.typesafe.sslconfig.akka.AkkaSSLConfig -import com.typesafe.sslconfig.ssl.ClientAuth import scala.annotation.tailrec import akka.stream.io._ diff --git a/akka-stream/src/main/scala/akka/stream/io/SslTls.scala b/akka-stream/src/main/scala/akka/stream/io/SslTls.scala index 81a19b8287..cbd6f4f37f 100644 --- a/akka-stream/src/main/scala/akka/stream/io/SslTls.scala +++ b/akka-stream/src/main/scala/akka/stream/io/SslTls.scala @@ -12,8 +12,6 @@ import akka.stream._ import akka.stream.impl.StreamLayout.Module import akka.util.ByteString import javax.net.ssl._ -import com.typesafe.sslconfig.akka.AkkaSSLConfig -import com.typesafe.sslconfig.ssl.ClientAuth import scala.annotation.varargs import scala.collection.immutable @@ -432,3 +430,23 @@ object NegotiateNewSession extends NegotiateNewSession(None, None, None, None) { * peer. */ case class SendBytes(bytes: ByteString) extends SslTlsOutbound + +/** + * An SSLEngine can either demand, allow or ignore its peer’s authentication + * (via certificates), where `Need` will fail the handshake if the peer does + * not provide valid credentials, `Want` allows the peer to send credentials + * and verifies them if provided, and `None` disables peer certificate + * verification. + * + * See the documentation for `SSLEngine::setWantClientAuth` for more information. + */ +sealed abstract class ClientAuth +object ClientAuth { + case object None extends ClientAuth + case object Want extends ClientAuth + case object Need extends ClientAuth + + def none: ClientAuth = None + def want: ClientAuth = Want + def need: ClientAuth = Need +}