diff --git a/akka-http-core/src/main/scala/akka/http/impl/engine/client/ClientConnectionSettings.scala b/akka-http-core/src/main/scala/akka/http/ClientConnectionSettings.scala similarity index 51% rename from akka-http-core/src/main/scala/akka/http/impl/engine/client/ClientConnectionSettings.scala rename to akka-http-core/src/main/scala/akka/http/ClientConnectionSettings.scala index b696f2cffc..829d3597aa 100644 --- a/akka-http-core/src/main/scala/akka/http/impl/engine/client/ClientConnectionSettings.scala +++ b/akka-http-core/src/main/scala/akka/http/ClientConnectionSettings.scala @@ -2,13 +2,14 @@ * Copyright (C) 2009-2014 Typesafe Inc. */ -package akka.http.impl.engine.client +package akka.http -import akka.http.ParserSettings -import com.typesafe.config.Config -import scala.concurrent.duration.{ FiniteDuration, Duration } -import akka.http.scaladsl.model.headers.`User-Agent` +import akka.actor.ActorSystem import akka.http.impl.util._ +import akka.http.scaladsl.model.headers.`User-Agent` +import com.typesafe.config.Config + +import scala.concurrent.duration.{ Duration, FiniteDuration } final case class ClientConnectionSettings( userAgentHeader: Option[`User-Agent`], @@ -30,4 +31,28 @@ object ClientConnectionSettings extends SettingsCompanion[ClientConnectionSettin c getIntBytes "request-header-size-hint", ParserSettings fromSubConfig c.getConfig("parsing")) } + + /** + * Creates an instance of ClientConnectionSettings using the configuration provided by the given + * ActorSystem. + * + * Java API + */ + def create(system: ActorSystem): ClientConnectionSettings = ClientConnectionSettings(system) + + /** + * Creates an instance of ClientConnectionSettings using the given Config. + * + * Java API + */ + def create(config: Config): ClientConnectionSettings = ClientConnectionSettings(config) + + /** + * Create an instance of ClientConnectionSettings using the given String of config overrides to override + * settings set in the class loader of this class (i.e. by application.conf or reference.conf files in + * the class loader of this class). + * + * Java API + */ + def create(configOverrides: String): ClientConnectionSettings = ClientConnectionSettings(configOverrides) } \ No newline at end of file diff --git a/akka-http-core/src/main/scala/akka/http/impl/engine/client/ConnectionPoolSettings.scala b/akka-http-core/src/main/scala/akka/http/ConnectionPoolSettings.scala similarity index 55% rename from akka-http-core/src/main/scala/akka/http/impl/engine/client/ConnectionPoolSettings.scala rename to akka-http-core/src/main/scala/akka/http/ConnectionPoolSettings.scala index 99184d9233..0d46ffd824 100644 --- a/akka-http-core/src/main/scala/akka/http/impl/engine/client/ConnectionPoolSettings.scala +++ b/akka-http-core/src/main/scala/akka/http/ConnectionPoolSettings.scala @@ -2,14 +2,18 @@ * Copyright (C) 2009-2014 Typesafe Inc. */ -package akka.http.impl.engine.client +package akka.http -import akka.event.LoggingAdapter -import akka.io.Inet +import java.lang.{ Iterable ⇒ JIterable } import com.typesafe.config.Config import scala.collection.immutable import scala.concurrent.duration.Duration +import akka.japi.Util._ + +import akka.actor.ActorSystem +import akka.event.LoggingAdapter import akka.http.impl.util._ +import akka.io.Inet final case class HostConnectionPoolSetup(host: String, port: Int, setup: ConnectionPoolSetup) @@ -19,6 +23,15 @@ final case class ConnectionPoolSetup( settings: ConnectionPoolSettings, log: LoggingAdapter) +object ConnectionPoolSetup { + /** Java API */ + def create(encrypted: Boolean, + options: JIterable[Inet.SocketOption], + settings: ConnectionPoolSettings, + log: LoggingAdapter): ConnectionPoolSetup = + ConnectionPoolSetup(encrypted, immutableSeq(options), settings, log) +} + final case class ConnectionPoolSettings( maxConnections: Int, maxRetries: Int, @@ -44,4 +57,28 @@ object ConnectionPoolSettings extends SettingsCompanion[ConnectionPoolSettings]( c getPotentiallyInfiniteDuration "idle-timeout", ClientConnectionSettings fromSubConfig c.getConfig("client")) } + + /** + * Creates an instance of ConnectionPoolSettings using the configuration provided by the given + * ActorSystem. + * + * Java API + */ + def create(system: ActorSystem): ConnectionPoolSettings = ConnectionPoolSettings(system) + + /** + * Creates an instance of ConnectionPoolSettings using the given Config. + * + * Java API + */ + def create(config: Config): ConnectionPoolSettings = ConnectionPoolSettings(config) + + /** + * Create an instance of ConnectionPoolSettings using the given String of config overrides to override + * settings set in the class loader of this class (i.e. by application.conf or reference.conf files in + * the class loader of this class). + * + * Java API + */ + def create(configOverrides: String): ConnectionPoolSettings = ConnectionPoolSettings(configOverrides) } \ No newline at end of file diff --git a/akka-http-core/src/main/scala/akka/http/impl/engine/client/OutgoingConnectionBlueprint.scala b/akka-http-core/src/main/scala/akka/http/impl/engine/client/OutgoingConnectionBlueprint.scala index 85dbf6fc77..5dae6f4042 100644 --- a/akka-http-core/src/main/scala/akka/http/impl/engine/client/OutgoingConnectionBlueprint.scala +++ b/akka-http-core/src/main/scala/akka/http/impl/engine/client/OutgoingConnectionBlueprint.scala @@ -4,6 +4,8 @@ package akka.http.impl.engine.client +import akka.http.ClientConnectionSettings + import language.existentials import java.net.InetSocketAddress import scala.annotation.tailrec diff --git a/akka-http-core/src/main/scala/akka/http/impl/engine/client/PoolFlow.scala b/akka-http-core/src/main/scala/akka/http/impl/engine/client/PoolFlow.scala index 3431cdb8fb..a6f162718d 100644 --- a/akka-http-core/src/main/scala/akka/http/impl/engine/client/PoolFlow.scala +++ b/akka-http-core/src/main/scala/akka/http/impl/engine/client/PoolFlow.scala @@ -5,6 +5,8 @@ package akka.http.impl.engine.client import java.net.InetSocketAddress +import akka.http.ConnectionPoolSettings + import scala.concurrent.{ Promise, Future } import scala.util.Try import akka.event.LoggingAdapter diff --git a/akka-http-core/src/main/scala/akka/http/impl/engine/client/PoolGateway.scala b/akka-http-core/src/main/scala/akka/http/impl/engine/client/PoolGateway.scala index 2bd73af7d0..34005517e6 100644 --- a/akka-http-core/src/main/scala/akka/http/impl/engine/client/PoolGateway.scala +++ b/akka-http-core/src/main/scala/akka/http/impl/engine/client/PoolGateway.scala @@ -1,6 +1,8 @@ package akka.http.impl.engine.client import java.util.concurrent.atomic.AtomicReference +import akka.http.HostConnectionPoolSetup + import scala.annotation.tailrec import scala.concurrent.{ Future, Promise } import akka.actor.{ Props, ActorSystem, ActorRef } diff --git a/akka-http-core/src/main/scala/akka/http/impl/engine/client/PoolInterfaceActor.scala b/akka-http-core/src/main/scala/akka/http/impl/engine/client/PoolInterfaceActor.scala index 1ed4291ebd..8935528b1d 100644 --- a/akka-http-core/src/main/scala/akka/http/impl/engine/client/PoolInterfaceActor.scala +++ b/akka-http-core/src/main/scala/akka/http/impl/engine/client/PoolInterfaceActor.scala @@ -5,6 +5,8 @@ package akka.http.impl.engine.client import java.net.InetSocketAddress +import akka.http.HostConnectionPoolSetup + import scala.annotation.tailrec import scala.concurrent.Promise import scala.concurrent.duration.FiniteDuration diff --git a/akka-http-core/src/main/scala/akka/http/impl/engine/client/PoolSlot.scala b/akka-http-core/src/main/scala/akka/http/impl/engine/client/PoolSlot.scala index aaf4ec8b68..00a336d7ce 100644 --- a/akka-http-core/src/main/scala/akka/http/impl/engine/client/PoolSlot.scala +++ b/akka-http-core/src/main/scala/akka/http/impl/engine/client/PoolSlot.scala @@ -4,6 +4,8 @@ package akka.http.impl.engine.client +import akka.http.ConnectionPoolSettings + import language.existentials import java.net.InetSocketAddress import scala.util.{ Failure, Success } diff --git a/akka-http-core/src/main/scala/akka/http/javadsl/HostConnectionPool.scala b/akka-http-core/src/main/scala/akka/http/javadsl/HostConnectionPool.scala new file mode 100644 index 0000000000..b0d4e866ff --- /dev/null +++ b/akka-http-core/src/main/scala/akka/http/javadsl/HostConnectionPool.scala @@ -0,0 +1,11 @@ +/* + * Copyright (C) 2009-2015 Typesafe Inc. + */ + +package akka.http.javadsl + +import akka.http.HostConnectionPoolSetup + +trait HostConnectionPool { + def setup: HostConnectionPoolSetup +} diff --git a/akka-http-core/src/main/scala/akka/http/javadsl/Http.scala b/akka-http-core/src/main/scala/akka/http/javadsl/Http.scala index a8c4ccf92f..08927ce3f9 100644 --- a/akka-http-core/src/main/scala/akka/http/javadsl/Http.scala +++ b/akka-http-core/src/main/scala/akka/http/javadsl/Http.scala @@ -5,11 +5,13 @@ package akka.http.javadsl import java.lang.{ Iterable ⇒ JIterable } -import akka.http.ServerSettings +import java.net.InetSocketAddress +import akka.http._ +import akka.stream.scaladsl.Keep import scala.concurrent.Future import akka.japi.Util._ -import akka.japi.Function +import akka.japi.{ Option, Function } import akka.actor.{ ExtendedActorSystem, ActorSystem, ExtensionIdProvider, ExtensionId } import akka.event.LoggingAdapter import akka.io.Inet @@ -17,6 +19,9 @@ import akka.stream.FlowMaterializer import akka.stream.javadsl.{ Flow, Source } import akka.http.scaladsl.{ model ⇒ sm } import akka.http.javadsl.model._ +import akka.http.impl.util.JavaMapping.Implicits._ + +import scala.util.Try object Http extends ExtensionId[Http] with ExtensionIdProvider { override def get(system: ActorSystem): Http = super.get(system) @@ -39,8 +44,8 @@ class Http(system: ExtendedActorSystem) extends akka.actor.Extension { * fail, unless the first materialization has already been unbound. Unbinding can be triggered via the materialized * [[ServerBinding]]. */ - def bind(interface: String, port: Int, fm: FlowMaterializer): Source[IncomingConnection, Future[ServerBinding]] = - Source.adapt(delegate.bind(interface, port)(fm) + def bind(interface: String, port: Int, materializer: FlowMaterializer): Source[IncomingConnection, Future[ServerBinding]] = + Source.adapt(delegate.bind(interface, port)(materializer) .map(new IncomingConnection(_)) .mapMaterialized(_.map(new ServerBinding(_))(ec))) @@ -58,8 +63,8 @@ class Http(system: ExtendedActorSystem) extends akka.actor.Extension { backlog: Int, options: JIterable[Inet.SocketOption], settings: ServerSettings, log: LoggingAdapter, - fm: FlowMaterializer): Source[IncomingConnection, Future[ServerBinding]] = - Source.adapt(delegate.bind(interface, port, backlog, immutableSeq(options), settings, log)(fm) + materializer: FlowMaterializer): Source[IncomingConnection, Future[ServerBinding]] = + Source.adapt(delegate.bind(interface, port, backlog, immutableSeq(options), settings, log)(materializer) .map(new IncomingConnection(_)) .mapMaterialized(_.map(new ServerBinding(_))(ec))) @@ -73,9 +78,9 @@ class Http(system: ExtendedActorSystem) extends akka.actor.Extension { */ def bindAndHandle(handler: Flow[HttpRequest, HttpResponse, _], interface: String, port: Int, - fm: FlowMaterializer): Future[ServerBinding] = + materializer: FlowMaterializer): Future[ServerBinding] = delegate.bindAndHandle(handler.asInstanceOf[Flow[sm.HttpRequest, sm.HttpResponse, _]].asScala, - interface, port)(fm) + interface, port)(materializer) .map(new ServerBinding(_))(ec) /** @@ -91,9 +96,9 @@ class Http(system: ExtendedActorSystem) extends akka.actor.Extension { backlog: Int, options: JIterable[Inet.SocketOption], settings: ServerSettings, log: LoggingAdapter, - fm: FlowMaterializer): Future[ServerBinding] = + materializer: FlowMaterializer): Future[ServerBinding] = delegate.bindAndHandle(handler.asInstanceOf[Flow[sm.HttpRequest, sm.HttpResponse, _]].asScala, - interface, port, backlog, immutableSeq(options), settings, log)(fm) + interface, port, backlog, immutableSeq(options), settings, log)(materializer) .map(new ServerBinding(_))(ec) /** @@ -106,8 +111,8 @@ class Http(system: ExtendedActorSystem) extends akka.actor.Extension { */ def bindAndHandleSync(handler: Function[HttpRequest, HttpResponse], interface: String, port: Int, - fm: FlowMaterializer): Future[ServerBinding] = - delegate.bindAndHandleSync(handler.apply(_).asInstanceOf[sm.HttpResponse], interface, port)(fm) + materializer: FlowMaterializer): Future[ServerBinding] = + delegate.bindAndHandleSync(handler.apply(_).asScala, interface, port)(materializer) .map(new ServerBinding(_))(ec) /** @@ -123,9 +128,9 @@ class Http(system: ExtendedActorSystem) extends akka.actor.Extension { backlog: Int, options: JIterable[Inet.SocketOption], settings: ServerSettings, log: LoggingAdapter, - fm: FlowMaterializer): Future[ServerBinding] = - delegate.bindAndHandleSync(handler.apply(_).asInstanceOf[sm.HttpResponse], - interface, port, backlog, immutableSeq(options), settings, log)(fm) + materializer: FlowMaterializer): Future[ServerBinding] = + delegate.bindAndHandleSync(handler.apply(_).asScala, + interface, port, backlog, immutableSeq(options), settings, log)(materializer) .map(new ServerBinding(_))(ec) /** @@ -138,8 +143,8 @@ class Http(system: ExtendedActorSystem) extends akka.actor.Extension { */ def bindAndHandleAsync(handler: Function[HttpRequest, Future[HttpResponse]], interface: String, port: Int, - fm: FlowMaterializer): Future[ServerBinding] = - delegate.bindAndHandleAsync(handler.apply(_).asInstanceOf[Future[sm.HttpResponse]], interface, port)(fm) + materializer: FlowMaterializer): Future[ServerBinding] = + delegate.bindAndHandleAsync(handler.apply(_).asInstanceOf[Future[sm.HttpResponse]], interface, port)(materializer) .map(new ServerBinding(_))(ec) /** @@ -155,8 +160,222 @@ class Http(system: ExtendedActorSystem) extends akka.actor.Extension { backlog: Int, options: JIterable[Inet.SocketOption], settings: ServerSettings, log: LoggingAdapter, - fm: FlowMaterializer): Future[ServerBinding] = + materializer: FlowMaterializer): Future[ServerBinding] = delegate.bindAndHandleAsync(handler.apply(_).asInstanceOf[Future[sm.HttpResponse]], - interface, port, backlog, immutableSeq(options), settings, log)(fm) + interface, port, backlog, immutableSeq(options), settings, log)(materializer) .map(new ServerBinding(_))(ec) + + /** + * Creates a [[Flow]] representing a prospective HTTP client connection to the given endpoint. + * Every materialization of the produced flow will attempt to establish a new outgoing connection. + */ + def outgoingConnection(host: String, port: Int): Flow[HttpRequest, HttpResponse, Future[OutgoingConnection]] = + Flow.wrap { + akka.stream.scaladsl.Flow[HttpRequest].map(_.asScala) + .viaMat(delegate.outgoingConnection(host, port))(Keep.right) + .mapMaterialized(_.map(new OutgoingConnection(_))(ec)) + } + + /** + * Creates a [[Flow]] representing a prospective HTTP client connection to the given endpoint. + * Every materialization of the produced flow will attempt to establish a new outgoing connection. + */ + def outgoingConnection(host: String, port: Int, + localAddress: Option[InetSocketAddress], + options: JIterable[Inet.SocketOption], + settings: ClientConnectionSettings, + log: LoggingAdapter): Flow[HttpRequest, HttpResponse, Future[OutgoingConnection]] = + Flow.wrap { + akka.stream.scaladsl.Flow[HttpRequest].map(_.asScala) + .viaMat(delegate.outgoingConnection(host, port, localAddress.asScala, immutableSeq(options), settings, log))(Keep.right) + .mapMaterialized(_.map(new OutgoingConnection(_))(ec)) + } + + /** + * Starts a new connection pool to the given host and configuration and returns a [[Flow]] which dispatches + * the requests from all its materializations across this pool. + * While the started host connection pool internally shuts itself down automatically after the configured idle + * timeout it will spin itself up again if more requests arrive from an existing or a new client flow + * materialization. The returned flow therefore remains usable for the full lifetime of the application. + * + * Since the underlying transport usually comprises more than a single connection the produced flow might generate + * responses in an order that doesn't directly match the consumed requests. + * For example, if two requests A and B enter the flow in that order the response for B might be produced before the + * response for A. + * In order to allow for easy response-to-request association the flow takes in a custom, opaque context + * object of type ``T`` from the application which is emitted together with the corresponding response. + */ + def newHostConnectionPool[T](host: String, port: Int, materializer: FlowMaterializer): Flow[(HttpRequest, T), (Try[HttpResponse], T), HostConnectionPool] = + adaptTupleFlow(delegate.newHostConnectionPool[T](host, port)(materializer)) + + /** + * Starts a new connection pool to the given host and configuration and returns a [[Flow]] which dispatches + * the requests from all its materializations across this pool. + * While the started host connection pool internally shuts itself down automatically after the configured idle + * timeout it will spin itself up again if more requests arrive from an existing or a new client flow + * materialization. The returned flow therefore remains usable for the full lifetime of the application. + * + * Since the underlying transport usually comprises more than a single connection the produced flow might generate + * responses in an order that doesn't directly match the consumed requests. + * For example, if two requests A and B enter the flow in that order the response for B might be produced before the + * response for A. + * In order to allow for easy response-to-request association the flow takes in a custom, opaque context + * object of type ``T`` from the application which is emitted together with the corresponding response. + */ + def newHostConnectionPool[T](host: String, port: Int, + options: JIterable[Inet.SocketOption], + settings: ConnectionPoolSettings, + log: LoggingAdapter, materializer: FlowMaterializer): Flow[(HttpRequest, T), (Try[HttpResponse], T), HostConnectionPool] = + adaptTupleFlow(delegate.newHostConnectionPool[T](host, port, immutableSeq(options), settings, log)(materializer)) + + /** + * Starts a new connection pool to the given host and configuration and returns a [[Flow]] which dispatches + * the requests from all its materializations across this pool. + * While the started host connection pool internally shuts itself down automatically after the configured idle + * timeout it will spin itself up again if more requests arrive from an existing or a new client flow + * materialization. The returned flow therefore remains usable for the full lifetime of the application. + * + * Since the underlying transport usually comprises more than a single connection the produced flow might generate + * responses in an order that doesn't directly match the consumed requests. + * For example, if two requests A and B enter the flow in that order the response for B might be produced before the + * response for A. + * In order to allow for easy response-to-request association the flow takes in a custom, opaque context + * object of type ``T`` from the application which is emitted together with the corresponding response. + */ + def newHostConnectionPool[T](setup: HostConnectionPoolSetup, materializer: FlowMaterializer): Flow[(HttpRequest, T), (Try[HttpResponse], T), HostConnectionPool] = + adaptTupleFlow(delegate.newHostConnectionPool[T](setup)(materializer)) + + /** + * Returns a [[Flow]] which dispatches incoming HTTP requests to the per-ActorSystem pool of outgoing + * HTTP connections to the given target host endpoint. For every ActorSystem, target host and pool + * configuration a separate connection pool is maintained. + * The HTTP layer transparently manages idle shutdown and restarting of connections pools as configured. + * The returned [[Flow]] instances therefore remain valid throughout the lifetime of the application. + * + * The internal caching logic guarantees that there will never be more than a single pool running for the + * given target host endpoint and configuration (in this ActorSystem). + * + * Since the underlying transport usually comprises more than a single connection the produced flow might generate + * responses in an order that doesn't directly match the consumed requests. + * For example, if two requests A and B enter the flow in that order the response for B might be produced before the + * response for A. + * In order to allow for easy response-to-request association the flow takes in a custom, opaque context + * object of type ``T`` from the application which is emitted together with the corresponding response. + */ + def cachedHostConnectionPool[T](host: String, port: Int, materializer: FlowMaterializer): Flow[(HttpRequest, T), (Try[HttpResponse], T), HostConnectionPool] = + adaptTupleFlow(delegate.cachedHostConnectionPool[T](host, port)(materializer)) + + /** + * Returns a [[Flow]] which dispatches incoming HTTP requests to the per-ActorSystem pool of outgoing + * HTTP connections to the given target host endpoint. For every ActorSystem, target host and pool + * configuration a separate connection pool is maintained. + * The HTTP layer transparently manages idle shutdown and restarting of connections pools as configured. + * The returned [[Flow]] instances therefore remain valid throughout the lifetime of the application. + * + * The internal caching logic guarantees that there will never be more than a single pool running for the + * given target host endpoint and configuration (in this ActorSystem). + * + * Since the underlying transport usually comprises more than a single connection the produced flow might generate + * responses in an order that doesn't directly match the consumed requests. + * For example, if two requests A and B enter the flow in that order the response for B might be produced before the + * response for A. + * In order to allow for easy response-to-request association the flow takes in a custom, opaque context + * object of type ``T`` from the application which is emitted together with the corresponding response. + */ + def cachedHostConnectionPool[T](host: String, port: Int, + options: JIterable[Inet.SocketOption], + settings: ConnectionPoolSettings, + log: LoggingAdapter, materializer: FlowMaterializer): Flow[(HttpRequest, T), (Try[HttpResponse], T), HostConnectionPool] = + adaptTupleFlow(delegate.cachedHostConnectionPool[T](host, port, immutableSeq(options), settings, log)(materializer)) + + /** + * Returns a [[Flow]] which dispatches incoming HTTP requests to the per-ActorSystem pool of outgoing + * HTTP connections to the given target host endpoint. For every ActorSystem, target host and pool + * configuration a separate connection pool is maintained. + * The HTTP layer transparently manages idle shutdown and restarting of connections pools as configured. + * The returned [[Flow]] instances therefore remain valid throughout the lifetime of the application. + * + * The internal caching logic guarantees that there will never be more than a single pool running for the + * given target host endpoint and configuration (in this ActorSystem). + * + * Since the underlying transport usually comprises more than a single connection the produced flow might generate + * responses in an order that doesn't directly match the consumed requests. + * For example, if two requests A and B enter the flow in that order the response for B might be produced before the + * response for A. + * In order to allow for easy response-to-request association the flow takes in a custom, opaque context + * object of type ``T`` from the application which is emitted together with the corresponding response. + */ + def cachedHostConnectionPool[T](setup: HostConnectionPoolSetup, materializer: FlowMaterializer): Flow[(HttpRequest, T), (Try[HttpResponse], T), HostConnectionPool] = + adaptTupleFlow(delegate.cachedHostConnectionPool[T](setup)(materializer)) + + /** + * Creates a new "super connection pool flow", which routes incoming requests to a (cached) host connection pool + * depending on their respective effective URI. Note that incoming requests must have either an absolute URI or + * a valid `Host` header. + * + * Since the underlying transport usually comprises more than a single connection the produced flow might generate + * responses in an order that doesn't directly match the consumed requests. + * For example, if two requests A and B enter the flow in that order the response for B might be produced before the + * response for A. + * In order to allow for easy response-to-request association the flow takes in a custom, opaque context + * object of type ``T`` from the application which is emitted together with the corresponding response. + */ + def superPool[T](materializer: FlowMaterializer): Flow[(HttpRequest, T), (Try[HttpResponse], T), Unit] = + adaptTupleFlow(delegate.superPool[T]()(materializer)) + + /** + * Creates a new "super connection pool flow", which routes incoming requests to a (cached) host connection pool + * depending on their respective effective URI. Note that incoming requests must have either an absolute URI or + * a valid `Host` header. + * + * Since the underlying transport usually comprises more than a single connection the produced flow might generate + * responses in an order that doesn't directly match the consumed requests. + * For example, if two requests A and B enter the flow in that order the response for B might be produced before the + * response for A. + * In order to allow for easy response-to-request association the flow takes in a custom, opaque context + * object of type ``T`` from the application which is emitted together with the corresponding response. + */ + def superPool[T](options: JIterable[Inet.SocketOption], + settings: ConnectionPoolSettings, + log: LoggingAdapter, materializer: FlowMaterializer): Flow[(HttpRequest, T), (Try[HttpResponse], T), Unit] = + adaptTupleFlow(delegate.superPool[T](immutableSeq(options), settings, log)(materializer)) + + /** + * Fires a single [[HttpRequest]] across the (cached) host connection pool for the request's + * effective URI to produce a response future. + * + * Note that the request must have either an absolute URI or a valid `Host` header, otherwise + * the future will be completed with an error. + */ + def singleRequest(request: HttpRequest, materializer: FlowMaterializer): Future[HttpResponse] = + delegate.singleRequest(request.asScala)(materializer) + + /** + * Fires a single [[HttpRequest]] across the (cached) host connection pool for the request's + * effective URI to produce a response future. + * + * Note that the request must have either an absolute URI or a valid `Host` header, otherwise + * the future will be completed with an error. + */ + def singleRequest(request: HttpRequest, + options: JIterable[Inet.SocketOption], + settings: ConnectionPoolSettings, + log: LoggingAdapter, materializer: FlowMaterializer): Future[HttpResponse] = + delegate.singleRequest(request.asScala, immutableSeq(options), settings, log)(materializer) + + /** + * Triggers an orderly shutdown of all host connections pools currently maintained by the [[ActorSystem]]. + * The returned future is completed when all pools that were live at the time of this method call + * have completed their shutdown process. + * + * If existing pool client flows are re-used or new ones materialized concurrently with or after this + * method call the respective connection pools will be restarted and not contribute to the returned future. + */ + def shutdownAllConnectionPools(): Future[Unit] = delegate.shutdownAllConnectionPools() + + private def adaptTupleFlow[T, Mat](scalaFlow: akka.stream.scaladsl.Flow[(scaladsl.model.HttpRequest, T), (Try[HttpResponse], T), Mat]): Flow[(HttpRequest, T), (Try[HttpResponse], T), Mat] = + Flow.wrap { + // we know that downcasting javadsl.model.HttpRequest => scaladsl.model.HttpRequest will always work + scalaFlow.asInstanceOf[akka.stream.scaladsl.Flow[(HttpRequest, T), (Try[HttpResponse], T), Mat]] + } } diff --git a/akka-http-core/src/main/scala/akka/http/javadsl/OutgoingConnection.scala b/akka-http-core/src/main/scala/akka/http/javadsl/OutgoingConnection.scala new file mode 100644 index 0000000000..407f7d1aa4 --- /dev/null +++ b/akka-http-core/src/main/scala/akka/http/javadsl/OutgoingConnection.scala @@ -0,0 +1,21 @@ +/* + * Copyright (C) 2009-2015 Typesafe Inc. + */ + +package akka.http.javadsl + +import java.net.InetSocketAddress + +import akka.http.scaladsl + +class OutgoingConnection private[http] (delegate: scaladsl.Http.OutgoingConnection) { + /** + * The local address of this connection. + */ + def localAddress: InetSocketAddress = delegate.localAddress + + /** + * The address of the remote peer. + */ + def remoteAddress: InetSocketAddress = delegate.remoteAddress +} diff --git a/akka-http-core/src/main/scala/akka/http/scaladsl/Http.scala b/akka-http-core/src/main/scala/akka/http/scaladsl/Http.scala index e97a8556e4..d38aa46981 100644 --- a/akka-http-core/src/main/scala/akka/http/scaladsl/Http.scala +++ b/akka-http-core/src/main/scala/akka/http/scaladsl/Http.scala @@ -6,7 +6,7 @@ package akka.http.scaladsl import java.net.InetSocketAddress import java.util.concurrent.ConcurrentHashMap -import akka.http.ServerSettings +import akka.http._ import com.typesafe.config.Config import scala.util.Try import scala.util.control.NonFatal @@ -446,7 +446,7 @@ object Http extends ExtensionId[HttpExt] with ExtensionIdProvider { * Represents a connection pool to a specific target host and pool configuration. */ case class HostConnectionPool(setup: HostConnectionPoolSetup)( - private[http] val gatewayFuture: Future[PoolGateway]) { // enable test access + private[http] val gatewayFuture: Future[PoolGateway]) extends javadsl.HostConnectionPool { // enable test access /** * Asynchronously triggers the shutdown of the host connection pool. diff --git a/akka-http-core/src/test/scala/akka/http/impl/engine/client/ConnectionPoolSpec.scala b/akka-http-core/src/test/scala/akka/http/impl/engine/client/ConnectionPoolSpec.scala index b50550a898..52aca0aa4c 100644 --- a/akka-http-core/src/test/scala/akka/http/impl/engine/client/ConnectionPoolSpec.scala +++ b/akka-http-core/src/test/scala/akka/http/impl/engine/client/ConnectionPoolSpec.scala @@ -5,7 +5,7 @@ package akka.http.impl.engine.client import java.util.concurrent.atomic.AtomicInteger -import akka.http.ServerSettings +import akka.http.{ ClientConnectionSettings, ConnectionPoolSettings, ServerSettings } import scala.concurrent.Await import scala.concurrent.duration._ diff --git a/akka-http-core/src/test/scala/akka/http/impl/engine/client/LowLevelOutgoingConnectionSpec.scala b/akka-http-core/src/test/scala/akka/http/impl/engine/client/LowLevelOutgoingConnectionSpec.scala index d15ab5a601..31f7405b3e 100644 --- a/akka-http-core/src/test/scala/akka/http/impl/engine/client/LowLevelOutgoingConnectionSpec.scala +++ b/akka-http-core/src/test/scala/akka/http/impl/engine/client/LowLevelOutgoingConnectionSpec.scala @@ -5,6 +5,7 @@ package akka.http.impl.engine.client import java.net.InetSocketAddress +import akka.http.ClientConnectionSettings import org.scalatest.Inside import akka.util.ByteString import akka.event.NoLogging diff --git a/akka-http-core/src/test/scala/akka/http/scaladsl/ClientServerSpec.scala b/akka-http-core/src/test/scala/akka/http/scaladsl/ClientServerSpec.scala index 90603ea07e..50214ab907 100644 --- a/akka-http-core/src/test/scala/akka/http/scaladsl/ClientServerSpec.scala +++ b/akka-http-core/src/test/scala/akka/http/scaladsl/ClientServerSpec.scala @@ -6,7 +6,7 @@ package akka.http.scaladsl import java.io.{ BufferedReader, BufferedWriter, InputStreamReader, OutputStreamWriter } import java.net.Socket -import akka.http.ServerSettings +import akka.http.{ ClientConnectionSettings, ServerSettings } import com.typesafe.config.{ Config, ConfigFactory } import scala.annotation.tailrec import scala.concurrent.Await @@ -17,7 +17,6 @@ import akka.testkit.EventFilter import akka.stream.{ ActorFlowMaterializer, BindFailedException } import akka.stream.scaladsl._ import akka.stream.testkit._ -import akka.http.impl.engine.client.ClientConnectionSettings import akka.http.scaladsl.model.HttpEntity._ import akka.http.scaladsl.model.HttpMethods._ import akka.http.scaladsl.model._