+htc client-side of http-core Java API
This commit is contained in:
parent
d25b3af252
commit
df48e82e45
14 changed files with 355 additions and 32 deletions
|
|
@ -2,13 +2,14 @@
|
||||||
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
|
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
|
||||||
*/
|
*/
|
||||||
|
|
||||||
package akka.http.impl.engine.client
|
package akka.http
|
||||||
|
|
||||||
import akka.http.ParserSettings
|
import akka.actor.ActorSystem
|
||||||
import com.typesafe.config.Config
|
|
||||||
import scala.concurrent.duration.{ FiniteDuration, Duration }
|
|
||||||
import akka.http.scaladsl.model.headers.`User-Agent`
|
|
||||||
import akka.http.impl.util._
|
import akka.http.impl.util._
|
||||||
|
import akka.http.scaladsl.model.headers.`User-Agent`
|
||||||
|
import com.typesafe.config.Config
|
||||||
|
|
||||||
|
import scala.concurrent.duration.{ Duration, FiniteDuration }
|
||||||
|
|
||||||
final case class ClientConnectionSettings(
|
final case class ClientConnectionSettings(
|
||||||
userAgentHeader: Option[`User-Agent`],
|
userAgentHeader: Option[`User-Agent`],
|
||||||
|
|
@ -30,4 +31,28 @@ object ClientConnectionSettings extends SettingsCompanion[ClientConnectionSettin
|
||||||
c getIntBytes "request-header-size-hint",
|
c getIntBytes "request-header-size-hint",
|
||||||
ParserSettings fromSubConfig c.getConfig("parsing"))
|
ParserSettings fromSubConfig c.getConfig("parsing"))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Creates an instance of ClientConnectionSettings using the configuration provided by the given
|
||||||
|
* ActorSystem.
|
||||||
|
*
|
||||||
|
* Java API
|
||||||
|
*/
|
||||||
|
def create(system: ActorSystem): ClientConnectionSettings = ClientConnectionSettings(system)
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Creates an instance of ClientConnectionSettings using the given Config.
|
||||||
|
*
|
||||||
|
* Java API
|
||||||
|
*/
|
||||||
|
def create(config: Config): ClientConnectionSettings = ClientConnectionSettings(config)
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create an instance of ClientConnectionSettings using the given String of config overrides to override
|
||||||
|
* settings set in the class loader of this class (i.e. by application.conf or reference.conf files in
|
||||||
|
* the class loader of this class).
|
||||||
|
*
|
||||||
|
* Java API
|
||||||
|
*/
|
||||||
|
def create(configOverrides: String): ClientConnectionSettings = ClientConnectionSettings(configOverrides)
|
||||||
}
|
}
|
||||||
|
|
@ -2,14 +2,18 @@
|
||||||
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
|
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
|
||||||
*/
|
*/
|
||||||
|
|
||||||
package akka.http.impl.engine.client
|
package akka.http
|
||||||
|
|
||||||
import akka.event.LoggingAdapter
|
import java.lang.{ Iterable ⇒ JIterable }
|
||||||
import akka.io.Inet
|
|
||||||
import com.typesafe.config.Config
|
import com.typesafe.config.Config
|
||||||
import scala.collection.immutable
|
import scala.collection.immutable
|
||||||
import scala.concurrent.duration.Duration
|
import scala.concurrent.duration.Duration
|
||||||
|
import akka.japi.Util._
|
||||||
|
|
||||||
|
import akka.actor.ActorSystem
|
||||||
|
import akka.event.LoggingAdapter
|
||||||
import akka.http.impl.util._
|
import akka.http.impl.util._
|
||||||
|
import akka.io.Inet
|
||||||
|
|
||||||
final case class HostConnectionPoolSetup(host: String, port: Int, setup: ConnectionPoolSetup)
|
final case class HostConnectionPoolSetup(host: String, port: Int, setup: ConnectionPoolSetup)
|
||||||
|
|
||||||
|
|
@ -19,6 +23,15 @@ final case class ConnectionPoolSetup(
|
||||||
settings: ConnectionPoolSettings,
|
settings: ConnectionPoolSettings,
|
||||||
log: LoggingAdapter)
|
log: LoggingAdapter)
|
||||||
|
|
||||||
|
object ConnectionPoolSetup {
|
||||||
|
/** Java API */
|
||||||
|
def create(encrypted: Boolean,
|
||||||
|
options: JIterable[Inet.SocketOption],
|
||||||
|
settings: ConnectionPoolSettings,
|
||||||
|
log: LoggingAdapter): ConnectionPoolSetup =
|
||||||
|
ConnectionPoolSetup(encrypted, immutableSeq(options), settings, log)
|
||||||
|
}
|
||||||
|
|
||||||
final case class ConnectionPoolSettings(
|
final case class ConnectionPoolSettings(
|
||||||
maxConnections: Int,
|
maxConnections: Int,
|
||||||
maxRetries: Int,
|
maxRetries: Int,
|
||||||
|
|
@ -44,4 +57,28 @@ object ConnectionPoolSettings extends SettingsCompanion[ConnectionPoolSettings](
|
||||||
c getPotentiallyInfiniteDuration "idle-timeout",
|
c getPotentiallyInfiniteDuration "idle-timeout",
|
||||||
ClientConnectionSettings fromSubConfig c.getConfig("client"))
|
ClientConnectionSettings fromSubConfig c.getConfig("client"))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Creates an instance of ConnectionPoolSettings using the configuration provided by the given
|
||||||
|
* ActorSystem.
|
||||||
|
*
|
||||||
|
* Java API
|
||||||
|
*/
|
||||||
|
def create(system: ActorSystem): ConnectionPoolSettings = ConnectionPoolSettings(system)
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Creates an instance of ConnectionPoolSettings using the given Config.
|
||||||
|
*
|
||||||
|
* Java API
|
||||||
|
*/
|
||||||
|
def create(config: Config): ConnectionPoolSettings = ConnectionPoolSettings(config)
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create an instance of ConnectionPoolSettings using the given String of config overrides to override
|
||||||
|
* settings set in the class loader of this class (i.e. by application.conf or reference.conf files in
|
||||||
|
* the class loader of this class).
|
||||||
|
*
|
||||||
|
* Java API
|
||||||
|
*/
|
||||||
|
def create(configOverrides: String): ConnectionPoolSettings = ConnectionPoolSettings(configOverrides)
|
||||||
}
|
}
|
||||||
|
|
@ -4,6 +4,8 @@
|
||||||
|
|
||||||
package akka.http.impl.engine.client
|
package akka.http.impl.engine.client
|
||||||
|
|
||||||
|
import akka.http.ClientConnectionSettings
|
||||||
|
|
||||||
import language.existentials
|
import language.existentials
|
||||||
import java.net.InetSocketAddress
|
import java.net.InetSocketAddress
|
||||||
import scala.annotation.tailrec
|
import scala.annotation.tailrec
|
||||||
|
|
|
||||||
|
|
@ -5,6 +5,8 @@
|
||||||
package akka.http.impl.engine.client
|
package akka.http.impl.engine.client
|
||||||
|
|
||||||
import java.net.InetSocketAddress
|
import java.net.InetSocketAddress
|
||||||
|
import akka.http.ConnectionPoolSettings
|
||||||
|
|
||||||
import scala.concurrent.{ Promise, Future }
|
import scala.concurrent.{ Promise, Future }
|
||||||
import scala.util.Try
|
import scala.util.Try
|
||||||
import akka.event.LoggingAdapter
|
import akka.event.LoggingAdapter
|
||||||
|
|
|
||||||
|
|
@ -1,6 +1,8 @@
|
||||||
package akka.http.impl.engine.client
|
package akka.http.impl.engine.client
|
||||||
|
|
||||||
import java.util.concurrent.atomic.AtomicReference
|
import java.util.concurrent.atomic.AtomicReference
|
||||||
|
import akka.http.HostConnectionPoolSetup
|
||||||
|
|
||||||
import scala.annotation.tailrec
|
import scala.annotation.tailrec
|
||||||
import scala.concurrent.{ Future, Promise }
|
import scala.concurrent.{ Future, Promise }
|
||||||
import akka.actor.{ Props, ActorSystem, ActorRef }
|
import akka.actor.{ Props, ActorSystem, ActorRef }
|
||||||
|
|
|
||||||
|
|
@ -5,6 +5,8 @@
|
||||||
package akka.http.impl.engine.client
|
package akka.http.impl.engine.client
|
||||||
|
|
||||||
import java.net.InetSocketAddress
|
import java.net.InetSocketAddress
|
||||||
|
import akka.http.HostConnectionPoolSetup
|
||||||
|
|
||||||
import scala.annotation.tailrec
|
import scala.annotation.tailrec
|
||||||
import scala.concurrent.Promise
|
import scala.concurrent.Promise
|
||||||
import scala.concurrent.duration.FiniteDuration
|
import scala.concurrent.duration.FiniteDuration
|
||||||
|
|
|
||||||
|
|
@ -4,6 +4,8 @@
|
||||||
|
|
||||||
package akka.http.impl.engine.client
|
package akka.http.impl.engine.client
|
||||||
|
|
||||||
|
import akka.http.ConnectionPoolSettings
|
||||||
|
|
||||||
import language.existentials
|
import language.existentials
|
||||||
import java.net.InetSocketAddress
|
import java.net.InetSocketAddress
|
||||||
import scala.util.{ Failure, Success }
|
import scala.util.{ Failure, Success }
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,11 @@
|
||||||
|
/*
|
||||||
|
* Copyright (C) 2009-2015 Typesafe Inc. <http://www.typesafe.com>
|
||||||
|
*/
|
||||||
|
|
||||||
|
package akka.http.javadsl
|
||||||
|
|
||||||
|
import akka.http.HostConnectionPoolSetup
|
||||||
|
|
||||||
|
trait HostConnectionPool {
|
||||||
|
def setup: HostConnectionPoolSetup
|
||||||
|
}
|
||||||
|
|
@ -5,11 +5,13 @@
|
||||||
package akka.http.javadsl
|
package akka.http.javadsl
|
||||||
|
|
||||||
import java.lang.{ Iterable ⇒ JIterable }
|
import java.lang.{ Iterable ⇒ JIterable }
|
||||||
import akka.http.ServerSettings
|
import java.net.InetSocketAddress
|
||||||
|
import akka.http._
|
||||||
|
import akka.stream.scaladsl.Keep
|
||||||
|
|
||||||
import scala.concurrent.Future
|
import scala.concurrent.Future
|
||||||
import akka.japi.Util._
|
import akka.japi.Util._
|
||||||
import akka.japi.Function
|
import akka.japi.{ Option, Function }
|
||||||
import akka.actor.{ ExtendedActorSystem, ActorSystem, ExtensionIdProvider, ExtensionId }
|
import akka.actor.{ ExtendedActorSystem, ActorSystem, ExtensionIdProvider, ExtensionId }
|
||||||
import akka.event.LoggingAdapter
|
import akka.event.LoggingAdapter
|
||||||
import akka.io.Inet
|
import akka.io.Inet
|
||||||
|
|
@ -17,6 +19,9 @@ import akka.stream.FlowMaterializer
|
||||||
import akka.stream.javadsl.{ Flow, Source }
|
import akka.stream.javadsl.{ Flow, Source }
|
||||||
import akka.http.scaladsl.{ model ⇒ sm }
|
import akka.http.scaladsl.{ model ⇒ sm }
|
||||||
import akka.http.javadsl.model._
|
import akka.http.javadsl.model._
|
||||||
|
import akka.http.impl.util.JavaMapping.Implicits._
|
||||||
|
|
||||||
|
import scala.util.Try
|
||||||
|
|
||||||
object Http extends ExtensionId[Http] with ExtensionIdProvider {
|
object Http extends ExtensionId[Http] with ExtensionIdProvider {
|
||||||
override def get(system: ActorSystem): Http = super.get(system)
|
override def get(system: ActorSystem): Http = super.get(system)
|
||||||
|
|
@ -39,8 +44,8 @@ class Http(system: ExtendedActorSystem) extends akka.actor.Extension {
|
||||||
* fail, unless the first materialization has already been unbound. Unbinding can be triggered via the materialized
|
* fail, unless the first materialization has already been unbound. Unbinding can be triggered via the materialized
|
||||||
* [[ServerBinding]].
|
* [[ServerBinding]].
|
||||||
*/
|
*/
|
||||||
def bind(interface: String, port: Int, fm: FlowMaterializer): Source[IncomingConnection, Future[ServerBinding]] =
|
def bind(interface: String, port: Int, materializer: FlowMaterializer): Source[IncomingConnection, Future[ServerBinding]] =
|
||||||
Source.adapt(delegate.bind(interface, port)(fm)
|
Source.adapt(delegate.bind(interface, port)(materializer)
|
||||||
.map(new IncomingConnection(_))
|
.map(new IncomingConnection(_))
|
||||||
.mapMaterialized(_.map(new ServerBinding(_))(ec)))
|
.mapMaterialized(_.map(new ServerBinding(_))(ec)))
|
||||||
|
|
||||||
|
|
@ -58,8 +63,8 @@ class Http(system: ExtendedActorSystem) extends akka.actor.Extension {
|
||||||
backlog: Int, options: JIterable[Inet.SocketOption],
|
backlog: Int, options: JIterable[Inet.SocketOption],
|
||||||
settings: ServerSettings,
|
settings: ServerSettings,
|
||||||
log: LoggingAdapter,
|
log: LoggingAdapter,
|
||||||
fm: FlowMaterializer): Source[IncomingConnection, Future[ServerBinding]] =
|
materializer: FlowMaterializer): Source[IncomingConnection, Future[ServerBinding]] =
|
||||||
Source.adapt(delegate.bind(interface, port, backlog, immutableSeq(options), settings, log)(fm)
|
Source.adapt(delegate.bind(interface, port, backlog, immutableSeq(options), settings, log)(materializer)
|
||||||
.map(new IncomingConnection(_))
|
.map(new IncomingConnection(_))
|
||||||
.mapMaterialized(_.map(new ServerBinding(_))(ec)))
|
.mapMaterialized(_.map(new ServerBinding(_))(ec)))
|
||||||
|
|
||||||
|
|
@ -73,9 +78,9 @@ class Http(system: ExtendedActorSystem) extends akka.actor.Extension {
|
||||||
*/
|
*/
|
||||||
def bindAndHandle(handler: Flow[HttpRequest, HttpResponse, _],
|
def bindAndHandle(handler: Flow[HttpRequest, HttpResponse, _],
|
||||||
interface: String, port: Int,
|
interface: String, port: Int,
|
||||||
fm: FlowMaterializer): Future[ServerBinding] =
|
materializer: FlowMaterializer): Future[ServerBinding] =
|
||||||
delegate.bindAndHandle(handler.asInstanceOf[Flow[sm.HttpRequest, sm.HttpResponse, _]].asScala,
|
delegate.bindAndHandle(handler.asInstanceOf[Flow[sm.HttpRequest, sm.HttpResponse, _]].asScala,
|
||||||
interface, port)(fm)
|
interface, port)(materializer)
|
||||||
.map(new ServerBinding(_))(ec)
|
.map(new ServerBinding(_))(ec)
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -91,9 +96,9 @@ class Http(system: ExtendedActorSystem) extends akka.actor.Extension {
|
||||||
backlog: Int, options: JIterable[Inet.SocketOption],
|
backlog: Int, options: JIterable[Inet.SocketOption],
|
||||||
settings: ServerSettings,
|
settings: ServerSettings,
|
||||||
log: LoggingAdapter,
|
log: LoggingAdapter,
|
||||||
fm: FlowMaterializer): Future[ServerBinding] =
|
materializer: FlowMaterializer): Future[ServerBinding] =
|
||||||
delegate.bindAndHandle(handler.asInstanceOf[Flow[sm.HttpRequest, sm.HttpResponse, _]].asScala,
|
delegate.bindAndHandle(handler.asInstanceOf[Flow[sm.HttpRequest, sm.HttpResponse, _]].asScala,
|
||||||
interface, port, backlog, immutableSeq(options), settings, log)(fm)
|
interface, port, backlog, immutableSeq(options), settings, log)(materializer)
|
||||||
.map(new ServerBinding(_))(ec)
|
.map(new ServerBinding(_))(ec)
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -106,8 +111,8 @@ class Http(system: ExtendedActorSystem) extends akka.actor.Extension {
|
||||||
*/
|
*/
|
||||||
def bindAndHandleSync(handler: Function[HttpRequest, HttpResponse],
|
def bindAndHandleSync(handler: Function[HttpRequest, HttpResponse],
|
||||||
interface: String, port: Int,
|
interface: String, port: Int,
|
||||||
fm: FlowMaterializer): Future[ServerBinding] =
|
materializer: FlowMaterializer): Future[ServerBinding] =
|
||||||
delegate.bindAndHandleSync(handler.apply(_).asInstanceOf[sm.HttpResponse], interface, port)(fm)
|
delegate.bindAndHandleSync(handler.apply(_).asScala, interface, port)(materializer)
|
||||||
.map(new ServerBinding(_))(ec)
|
.map(new ServerBinding(_))(ec)
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -123,9 +128,9 @@ class Http(system: ExtendedActorSystem) extends akka.actor.Extension {
|
||||||
backlog: Int, options: JIterable[Inet.SocketOption],
|
backlog: Int, options: JIterable[Inet.SocketOption],
|
||||||
settings: ServerSettings,
|
settings: ServerSettings,
|
||||||
log: LoggingAdapter,
|
log: LoggingAdapter,
|
||||||
fm: FlowMaterializer): Future[ServerBinding] =
|
materializer: FlowMaterializer): Future[ServerBinding] =
|
||||||
delegate.bindAndHandleSync(handler.apply(_).asInstanceOf[sm.HttpResponse],
|
delegate.bindAndHandleSync(handler.apply(_).asScala,
|
||||||
interface, port, backlog, immutableSeq(options), settings, log)(fm)
|
interface, port, backlog, immutableSeq(options), settings, log)(materializer)
|
||||||
.map(new ServerBinding(_))(ec)
|
.map(new ServerBinding(_))(ec)
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -138,8 +143,8 @@ class Http(system: ExtendedActorSystem) extends akka.actor.Extension {
|
||||||
*/
|
*/
|
||||||
def bindAndHandleAsync(handler: Function[HttpRequest, Future[HttpResponse]],
|
def bindAndHandleAsync(handler: Function[HttpRequest, Future[HttpResponse]],
|
||||||
interface: String, port: Int,
|
interface: String, port: Int,
|
||||||
fm: FlowMaterializer): Future[ServerBinding] =
|
materializer: FlowMaterializer): Future[ServerBinding] =
|
||||||
delegate.bindAndHandleAsync(handler.apply(_).asInstanceOf[Future[sm.HttpResponse]], interface, port)(fm)
|
delegate.bindAndHandleAsync(handler.apply(_).asInstanceOf[Future[sm.HttpResponse]], interface, port)(materializer)
|
||||||
.map(new ServerBinding(_))(ec)
|
.map(new ServerBinding(_))(ec)
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -155,8 +160,222 @@ class Http(system: ExtendedActorSystem) extends akka.actor.Extension {
|
||||||
backlog: Int, options: JIterable[Inet.SocketOption],
|
backlog: Int, options: JIterable[Inet.SocketOption],
|
||||||
settings: ServerSettings,
|
settings: ServerSettings,
|
||||||
log: LoggingAdapter,
|
log: LoggingAdapter,
|
||||||
fm: FlowMaterializer): Future[ServerBinding] =
|
materializer: FlowMaterializer): Future[ServerBinding] =
|
||||||
delegate.bindAndHandleAsync(handler.apply(_).asInstanceOf[Future[sm.HttpResponse]],
|
delegate.bindAndHandleAsync(handler.apply(_).asInstanceOf[Future[sm.HttpResponse]],
|
||||||
interface, port, backlog, immutableSeq(options), settings, log)(fm)
|
interface, port, backlog, immutableSeq(options), settings, log)(materializer)
|
||||||
.map(new ServerBinding(_))(ec)
|
.map(new ServerBinding(_))(ec)
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Creates a [[Flow]] representing a prospective HTTP client connection to the given endpoint.
|
||||||
|
* Every materialization of the produced flow will attempt to establish a new outgoing connection.
|
||||||
|
*/
|
||||||
|
def outgoingConnection(host: String, port: Int): Flow[HttpRequest, HttpResponse, Future[OutgoingConnection]] =
|
||||||
|
Flow.wrap {
|
||||||
|
akka.stream.scaladsl.Flow[HttpRequest].map(_.asScala)
|
||||||
|
.viaMat(delegate.outgoingConnection(host, port))(Keep.right)
|
||||||
|
.mapMaterialized(_.map(new OutgoingConnection(_))(ec))
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Creates a [[Flow]] representing a prospective HTTP client connection to the given endpoint.
|
||||||
|
* Every materialization of the produced flow will attempt to establish a new outgoing connection.
|
||||||
|
*/
|
||||||
|
def outgoingConnection(host: String, port: Int,
|
||||||
|
localAddress: Option[InetSocketAddress],
|
||||||
|
options: JIterable[Inet.SocketOption],
|
||||||
|
settings: ClientConnectionSettings,
|
||||||
|
log: LoggingAdapter): Flow[HttpRequest, HttpResponse, Future[OutgoingConnection]] =
|
||||||
|
Flow.wrap {
|
||||||
|
akka.stream.scaladsl.Flow[HttpRequest].map(_.asScala)
|
||||||
|
.viaMat(delegate.outgoingConnection(host, port, localAddress.asScala, immutableSeq(options), settings, log))(Keep.right)
|
||||||
|
.mapMaterialized(_.map(new OutgoingConnection(_))(ec))
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Starts a new connection pool to the given host and configuration and returns a [[Flow]] which dispatches
|
||||||
|
* the requests from all its materializations across this pool.
|
||||||
|
* While the started host connection pool internally shuts itself down automatically after the configured idle
|
||||||
|
* timeout it will spin itself up again if more requests arrive from an existing or a new client flow
|
||||||
|
* materialization. The returned flow therefore remains usable for the full lifetime of the application.
|
||||||
|
*
|
||||||
|
* Since the underlying transport usually comprises more than a single connection the produced flow might generate
|
||||||
|
* responses in an order that doesn't directly match the consumed requests.
|
||||||
|
* For example, if two requests A and B enter the flow in that order the response for B might be produced before the
|
||||||
|
* response for A.
|
||||||
|
* In order to allow for easy response-to-request association the flow takes in a custom, opaque context
|
||||||
|
* object of type ``T`` from the application which is emitted together with the corresponding response.
|
||||||
|
*/
|
||||||
|
def newHostConnectionPool[T](host: String, port: Int, materializer: FlowMaterializer): Flow[(HttpRequest, T), (Try[HttpResponse], T), HostConnectionPool] =
|
||||||
|
adaptTupleFlow(delegate.newHostConnectionPool[T](host, port)(materializer))
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Starts a new connection pool to the given host and configuration and returns a [[Flow]] which dispatches
|
||||||
|
* the requests from all its materializations across this pool.
|
||||||
|
* While the started host connection pool internally shuts itself down automatically after the configured idle
|
||||||
|
* timeout it will spin itself up again if more requests arrive from an existing or a new client flow
|
||||||
|
* materialization. The returned flow therefore remains usable for the full lifetime of the application.
|
||||||
|
*
|
||||||
|
* Since the underlying transport usually comprises more than a single connection the produced flow might generate
|
||||||
|
* responses in an order that doesn't directly match the consumed requests.
|
||||||
|
* For example, if two requests A and B enter the flow in that order the response for B might be produced before the
|
||||||
|
* response for A.
|
||||||
|
* In order to allow for easy response-to-request association the flow takes in a custom, opaque context
|
||||||
|
* object of type ``T`` from the application which is emitted together with the corresponding response.
|
||||||
|
*/
|
||||||
|
def newHostConnectionPool[T](host: String, port: Int,
|
||||||
|
options: JIterable[Inet.SocketOption],
|
||||||
|
settings: ConnectionPoolSettings,
|
||||||
|
log: LoggingAdapter, materializer: FlowMaterializer): Flow[(HttpRequest, T), (Try[HttpResponse], T), HostConnectionPool] =
|
||||||
|
adaptTupleFlow(delegate.newHostConnectionPool[T](host, port, immutableSeq(options), settings, log)(materializer))
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Starts a new connection pool to the given host and configuration and returns a [[Flow]] which dispatches
|
||||||
|
* the requests from all its materializations across this pool.
|
||||||
|
* While the started host connection pool internally shuts itself down automatically after the configured idle
|
||||||
|
* timeout it will spin itself up again if more requests arrive from an existing or a new client flow
|
||||||
|
* materialization. The returned flow therefore remains usable for the full lifetime of the application.
|
||||||
|
*
|
||||||
|
* Since the underlying transport usually comprises more than a single connection the produced flow might generate
|
||||||
|
* responses in an order that doesn't directly match the consumed requests.
|
||||||
|
* For example, if two requests A and B enter the flow in that order the response for B might be produced before the
|
||||||
|
* response for A.
|
||||||
|
* In order to allow for easy response-to-request association the flow takes in a custom, opaque context
|
||||||
|
* object of type ``T`` from the application which is emitted together with the corresponding response.
|
||||||
|
*/
|
||||||
|
def newHostConnectionPool[T](setup: HostConnectionPoolSetup, materializer: FlowMaterializer): Flow[(HttpRequest, T), (Try[HttpResponse], T), HostConnectionPool] =
|
||||||
|
adaptTupleFlow(delegate.newHostConnectionPool[T](setup)(materializer))
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns a [[Flow]] which dispatches incoming HTTP requests to the per-ActorSystem pool of outgoing
|
||||||
|
* HTTP connections to the given target host endpoint. For every ActorSystem, target host and pool
|
||||||
|
* configuration a separate connection pool is maintained.
|
||||||
|
* The HTTP layer transparently manages idle shutdown and restarting of connections pools as configured.
|
||||||
|
* The returned [[Flow]] instances therefore remain valid throughout the lifetime of the application.
|
||||||
|
*
|
||||||
|
* The internal caching logic guarantees that there will never be more than a single pool running for the
|
||||||
|
* given target host endpoint and configuration (in this ActorSystem).
|
||||||
|
*
|
||||||
|
* Since the underlying transport usually comprises more than a single connection the produced flow might generate
|
||||||
|
* responses in an order that doesn't directly match the consumed requests.
|
||||||
|
* For example, if two requests A and B enter the flow in that order the response for B might be produced before the
|
||||||
|
* response for A.
|
||||||
|
* In order to allow for easy response-to-request association the flow takes in a custom, opaque context
|
||||||
|
* object of type ``T`` from the application which is emitted together with the corresponding response.
|
||||||
|
*/
|
||||||
|
def cachedHostConnectionPool[T](host: String, port: Int, materializer: FlowMaterializer): Flow[(HttpRequest, T), (Try[HttpResponse], T), HostConnectionPool] =
|
||||||
|
adaptTupleFlow(delegate.cachedHostConnectionPool[T](host, port)(materializer))
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns a [[Flow]] which dispatches incoming HTTP requests to the per-ActorSystem pool of outgoing
|
||||||
|
* HTTP connections to the given target host endpoint. For every ActorSystem, target host and pool
|
||||||
|
* configuration a separate connection pool is maintained.
|
||||||
|
* The HTTP layer transparently manages idle shutdown and restarting of connections pools as configured.
|
||||||
|
* The returned [[Flow]] instances therefore remain valid throughout the lifetime of the application.
|
||||||
|
*
|
||||||
|
* The internal caching logic guarantees that there will never be more than a single pool running for the
|
||||||
|
* given target host endpoint and configuration (in this ActorSystem).
|
||||||
|
*
|
||||||
|
* Since the underlying transport usually comprises more than a single connection the produced flow might generate
|
||||||
|
* responses in an order that doesn't directly match the consumed requests.
|
||||||
|
* For example, if two requests A and B enter the flow in that order the response for B might be produced before the
|
||||||
|
* response for A.
|
||||||
|
* In order to allow for easy response-to-request association the flow takes in a custom, opaque context
|
||||||
|
* object of type ``T`` from the application which is emitted together with the corresponding response.
|
||||||
|
*/
|
||||||
|
def cachedHostConnectionPool[T](host: String, port: Int,
|
||||||
|
options: JIterable[Inet.SocketOption],
|
||||||
|
settings: ConnectionPoolSettings,
|
||||||
|
log: LoggingAdapter, materializer: FlowMaterializer): Flow[(HttpRequest, T), (Try[HttpResponse], T), HostConnectionPool] =
|
||||||
|
adaptTupleFlow(delegate.cachedHostConnectionPool[T](host, port, immutableSeq(options), settings, log)(materializer))
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns a [[Flow]] which dispatches incoming HTTP requests to the per-ActorSystem pool of outgoing
|
||||||
|
* HTTP connections to the given target host endpoint. For every ActorSystem, target host and pool
|
||||||
|
* configuration a separate connection pool is maintained.
|
||||||
|
* The HTTP layer transparently manages idle shutdown and restarting of connections pools as configured.
|
||||||
|
* The returned [[Flow]] instances therefore remain valid throughout the lifetime of the application.
|
||||||
|
*
|
||||||
|
* The internal caching logic guarantees that there will never be more than a single pool running for the
|
||||||
|
* given target host endpoint and configuration (in this ActorSystem).
|
||||||
|
*
|
||||||
|
* Since the underlying transport usually comprises more than a single connection the produced flow might generate
|
||||||
|
* responses in an order that doesn't directly match the consumed requests.
|
||||||
|
* For example, if two requests A and B enter the flow in that order the response for B might be produced before the
|
||||||
|
* response for A.
|
||||||
|
* In order to allow for easy response-to-request association the flow takes in a custom, opaque context
|
||||||
|
* object of type ``T`` from the application which is emitted together with the corresponding response.
|
||||||
|
*/
|
||||||
|
def cachedHostConnectionPool[T](setup: HostConnectionPoolSetup, materializer: FlowMaterializer): Flow[(HttpRequest, T), (Try[HttpResponse], T), HostConnectionPool] =
|
||||||
|
adaptTupleFlow(delegate.cachedHostConnectionPool[T](setup)(materializer))
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Creates a new "super connection pool flow", which routes incoming requests to a (cached) host connection pool
|
||||||
|
* depending on their respective effective URI. Note that incoming requests must have either an absolute URI or
|
||||||
|
* a valid `Host` header.
|
||||||
|
*
|
||||||
|
* Since the underlying transport usually comprises more than a single connection the produced flow might generate
|
||||||
|
* responses in an order that doesn't directly match the consumed requests.
|
||||||
|
* For example, if two requests A and B enter the flow in that order the response for B might be produced before the
|
||||||
|
* response for A.
|
||||||
|
* In order to allow for easy response-to-request association the flow takes in a custom, opaque context
|
||||||
|
* object of type ``T`` from the application which is emitted together with the corresponding response.
|
||||||
|
*/
|
||||||
|
def superPool[T](materializer: FlowMaterializer): Flow[(HttpRequest, T), (Try[HttpResponse], T), Unit] =
|
||||||
|
adaptTupleFlow(delegate.superPool[T]()(materializer))
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Creates a new "super connection pool flow", which routes incoming requests to a (cached) host connection pool
|
||||||
|
* depending on their respective effective URI. Note that incoming requests must have either an absolute URI or
|
||||||
|
* a valid `Host` header.
|
||||||
|
*
|
||||||
|
* Since the underlying transport usually comprises more than a single connection the produced flow might generate
|
||||||
|
* responses in an order that doesn't directly match the consumed requests.
|
||||||
|
* For example, if two requests A and B enter the flow in that order the response for B might be produced before the
|
||||||
|
* response for A.
|
||||||
|
* In order to allow for easy response-to-request association the flow takes in a custom, opaque context
|
||||||
|
* object of type ``T`` from the application which is emitted together with the corresponding response.
|
||||||
|
*/
|
||||||
|
def superPool[T](options: JIterable[Inet.SocketOption],
|
||||||
|
settings: ConnectionPoolSettings,
|
||||||
|
log: LoggingAdapter, materializer: FlowMaterializer): Flow[(HttpRequest, T), (Try[HttpResponse], T), Unit] =
|
||||||
|
adaptTupleFlow(delegate.superPool[T](immutableSeq(options), settings, log)(materializer))
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Fires a single [[HttpRequest]] across the (cached) host connection pool for the request's
|
||||||
|
* effective URI to produce a response future.
|
||||||
|
*
|
||||||
|
* Note that the request must have either an absolute URI or a valid `Host` header, otherwise
|
||||||
|
* the future will be completed with an error.
|
||||||
|
*/
|
||||||
|
def singleRequest(request: HttpRequest, materializer: FlowMaterializer): Future[HttpResponse] =
|
||||||
|
delegate.singleRequest(request.asScala)(materializer)
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Fires a single [[HttpRequest]] across the (cached) host connection pool for the request's
|
||||||
|
* effective URI to produce a response future.
|
||||||
|
*
|
||||||
|
* Note that the request must have either an absolute URI or a valid `Host` header, otherwise
|
||||||
|
* the future will be completed with an error.
|
||||||
|
*/
|
||||||
|
def singleRequest(request: HttpRequest,
|
||||||
|
options: JIterable[Inet.SocketOption],
|
||||||
|
settings: ConnectionPoolSettings,
|
||||||
|
log: LoggingAdapter, materializer: FlowMaterializer): Future[HttpResponse] =
|
||||||
|
delegate.singleRequest(request.asScala, immutableSeq(options), settings, log)(materializer)
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Triggers an orderly shutdown of all host connections pools currently maintained by the [[ActorSystem]].
|
||||||
|
* The returned future is completed when all pools that were live at the time of this method call
|
||||||
|
* have completed their shutdown process.
|
||||||
|
*
|
||||||
|
* If existing pool client flows are re-used or new ones materialized concurrently with or after this
|
||||||
|
* method call the respective connection pools will be restarted and not contribute to the returned future.
|
||||||
|
*/
|
||||||
|
def shutdownAllConnectionPools(): Future[Unit] = delegate.shutdownAllConnectionPools()
|
||||||
|
|
||||||
|
private def adaptTupleFlow[T, Mat](scalaFlow: akka.stream.scaladsl.Flow[(scaladsl.model.HttpRequest, T), (Try[HttpResponse], T), Mat]): Flow[(HttpRequest, T), (Try[HttpResponse], T), Mat] =
|
||||||
|
Flow.wrap {
|
||||||
|
// we know that downcasting javadsl.model.HttpRequest => scaladsl.model.HttpRequest will always work
|
||||||
|
scalaFlow.asInstanceOf[akka.stream.scaladsl.Flow[(HttpRequest, T), (Try[HttpResponse], T), Mat]]
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,21 @@
|
||||||
|
/*
|
||||||
|
* Copyright (C) 2009-2015 Typesafe Inc. <http://www.typesafe.com>
|
||||||
|
*/
|
||||||
|
|
||||||
|
package akka.http.javadsl
|
||||||
|
|
||||||
|
import java.net.InetSocketAddress
|
||||||
|
|
||||||
|
import akka.http.scaladsl
|
||||||
|
|
||||||
|
class OutgoingConnection private[http] (delegate: scaladsl.Http.OutgoingConnection) {
|
||||||
|
/**
|
||||||
|
* The local address of this connection.
|
||||||
|
*/
|
||||||
|
def localAddress: InetSocketAddress = delegate.localAddress
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The address of the remote peer.
|
||||||
|
*/
|
||||||
|
def remoteAddress: InetSocketAddress = delegate.remoteAddress
|
||||||
|
}
|
||||||
|
|
@ -6,7 +6,7 @@ package akka.http.scaladsl
|
||||||
|
|
||||||
import java.net.InetSocketAddress
|
import java.net.InetSocketAddress
|
||||||
import java.util.concurrent.ConcurrentHashMap
|
import java.util.concurrent.ConcurrentHashMap
|
||||||
import akka.http.ServerSettings
|
import akka.http._
|
||||||
import com.typesafe.config.Config
|
import com.typesafe.config.Config
|
||||||
import scala.util.Try
|
import scala.util.Try
|
||||||
import scala.util.control.NonFatal
|
import scala.util.control.NonFatal
|
||||||
|
|
@ -446,7 +446,7 @@ object Http extends ExtensionId[HttpExt] with ExtensionIdProvider {
|
||||||
* Represents a connection pool to a specific target host and pool configuration.
|
* Represents a connection pool to a specific target host and pool configuration.
|
||||||
*/
|
*/
|
||||||
case class HostConnectionPool(setup: HostConnectionPoolSetup)(
|
case class HostConnectionPool(setup: HostConnectionPoolSetup)(
|
||||||
private[http] val gatewayFuture: Future[PoolGateway]) { // enable test access
|
private[http] val gatewayFuture: Future[PoolGateway]) extends javadsl.HostConnectionPool { // enable test access
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Asynchronously triggers the shutdown of the host connection pool.
|
* Asynchronously triggers the shutdown of the host connection pool.
|
||||||
|
|
|
||||||
|
|
@ -5,7 +5,7 @@
|
||||||
package akka.http.impl.engine.client
|
package akka.http.impl.engine.client
|
||||||
|
|
||||||
import java.util.concurrent.atomic.AtomicInteger
|
import java.util.concurrent.atomic.AtomicInteger
|
||||||
import akka.http.ServerSettings
|
import akka.http.{ ClientConnectionSettings, ConnectionPoolSettings, ServerSettings }
|
||||||
|
|
||||||
import scala.concurrent.Await
|
import scala.concurrent.Await
|
||||||
import scala.concurrent.duration._
|
import scala.concurrent.duration._
|
||||||
|
|
|
||||||
|
|
@ -5,6 +5,7 @@
|
||||||
package akka.http.impl.engine.client
|
package akka.http.impl.engine.client
|
||||||
|
|
||||||
import java.net.InetSocketAddress
|
import java.net.InetSocketAddress
|
||||||
|
import akka.http.ClientConnectionSettings
|
||||||
import org.scalatest.Inside
|
import org.scalatest.Inside
|
||||||
import akka.util.ByteString
|
import akka.util.ByteString
|
||||||
import akka.event.NoLogging
|
import akka.event.NoLogging
|
||||||
|
|
|
||||||
|
|
@ -6,7 +6,7 @@ package akka.http.scaladsl
|
||||||
|
|
||||||
import java.io.{ BufferedReader, BufferedWriter, InputStreamReader, OutputStreamWriter }
|
import java.io.{ BufferedReader, BufferedWriter, InputStreamReader, OutputStreamWriter }
|
||||||
import java.net.Socket
|
import java.net.Socket
|
||||||
import akka.http.ServerSettings
|
import akka.http.{ ClientConnectionSettings, ServerSettings }
|
||||||
import com.typesafe.config.{ Config, ConfigFactory }
|
import com.typesafe.config.{ Config, ConfigFactory }
|
||||||
import scala.annotation.tailrec
|
import scala.annotation.tailrec
|
||||||
import scala.concurrent.Await
|
import scala.concurrent.Await
|
||||||
|
|
@ -17,7 +17,6 @@ import akka.testkit.EventFilter
|
||||||
import akka.stream.{ ActorFlowMaterializer, BindFailedException }
|
import akka.stream.{ ActorFlowMaterializer, BindFailedException }
|
||||||
import akka.stream.scaladsl._
|
import akka.stream.scaladsl._
|
||||||
import akka.stream.testkit._
|
import akka.stream.testkit._
|
||||||
import akka.http.impl.engine.client.ClientConnectionSettings
|
|
||||||
import akka.http.scaladsl.model.HttpEntity._
|
import akka.http.scaladsl.model.HttpEntity._
|
||||||
import akka.http.scaladsl.model.HttpMethods._
|
import akka.http.scaladsl.model.HttpMethods._
|
||||||
import akka.http.scaladsl.model._
|
import akka.http.scaladsl.model._
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue