+htc #19577 binding DSL for javadsl Http

This commit is contained in:
Konrad Malawski 2016-02-10 18:21:59 +01:00
parent f042204d8b
commit d179945a97
3 changed files with 44 additions and 6 deletions

View file

@ -15,7 +15,7 @@ abstract class ConnectHttp {
def isHttps: Boolean def isHttps: Boolean
def connectionContext: Optional[HttpsConnectionContext] def connectionContext: Optional[HttpsConnectionContext]
final def effectiveConnectionContext(fallbackContext: HttpsConnectionContext): HttpsConnectionContext = final def effectiveHttpsConnectionContext(fallbackContext: HttpsConnectionContext): HttpsConnectionContext =
connectionContext.orElse(fallbackContext) connectionContext.orElse(fallbackContext)
} }

View file

@ -80,6 +80,27 @@ class Http(system: ExtendedActorSystem) extends akka.actor.Extension {
materializer: Materializer): BidiFlow[HttpResponse, SslTlsOutbound, SslTlsInbound, HttpRequest, NotUsed] = materializer: Materializer): BidiFlow[HttpResponse, SslTlsOutbound, SslTlsInbound, HttpRequest, NotUsed] =
adaptServerLayer(delegate.serverLayer(settings.asScala, remoteAddress.asScala, log)(materializer)) adaptServerLayer(delegate.serverLayer(settings.asScala, remoteAddress.asScala, log)(materializer))
/**
* Creates a [[Source]] of [[IncomingConnection]] instances which represents a prospective HTTP server binding
* on the given `endpoint`.
* If the given port is 0 the resulting source can be materialized several times. Each materialization will
* then be assigned a new local port by the operating system, which can then be retrieved by the materialized
* [[ServerBinding]].
* If the given port is non-zero subsequent materialization attempts of the produced source will immediately
* fail, unless the first materialization has already been unbound. Unbinding can be triggered via the materialized
* [[ServerBinding]].
*/
def bind(connect: ConnectHttp, materializer: Materializer): Source[IncomingConnection, CompletionStage[ServerBinding]] = {
val connectionContext =
if (connect.connectionContext.isPresent) connect.connectionContext.get()
else defaultServerHttpContext
new Source(delegate.bind(connect.host, connect.port, connectionContext.asScala)(materializer)
.map(new IncomingConnection(_))
.mapMaterializedValue(_.map(new ServerBinding(_))(ec).toJava))
}
/** /**
* Creates a [[Source]] of [[IncomingConnection]] instances which represents a prospective HTTP server binding * Creates a [[Source]] of [[IncomingConnection]] instances which represents a prospective HTTP server binding
* on the given `endpoint`. * on the given `endpoint`.
@ -326,7 +347,7 @@ class Http(system: ExtendedActorSystem) extends akka.actor.Extension {
*/ */
def outgoingConnection(to: ConnectHttp): Flow[HttpRequest, HttpResponse, CompletionStage[OutgoingConnection]] = def outgoingConnection(to: ConnectHttp): Flow[HttpRequest, HttpResponse, CompletionStage[OutgoingConnection]] =
adaptOutgoingFlow { adaptOutgoingFlow {
if (to.isHttps) delegate.outgoingConnectionHttps(to.host, to.port, to.effectiveConnectionContext(defaultClientHttpsContext).asScala) if (to.isHttps) delegate.outgoingConnectionHttps(to.host, to.port, to.effectiveHttpsConnectionContext(defaultClientHttpsContext).asScala)
else delegate.outgoingConnection(to.host, to.port) else delegate.outgoingConnection(to.host, to.port)
} }
@ -389,7 +410,7 @@ class Http(system: ExtendedActorSystem) extends akka.actor.Extension {
settings: ConnectionPoolSettings, settings: ConnectionPoolSettings,
log: LoggingAdapter, materializer: Materializer): Flow[Pair[HttpRequest, T], Pair[Try[HttpResponse], T], HostConnectionPool] = log: LoggingAdapter, materializer: Materializer): Flow[Pair[HttpRequest, T], Pair[Try[HttpResponse], T], HostConnectionPool] =
adaptTupleFlow { adaptTupleFlow {
to.effectiveConnectionContext(defaultClientHttpsContext) match { to.effectiveHttpsConnectionContext(defaultClientHttpsContext) match {
case https: HttpsConnectionContext case https: HttpsConnectionContext
delegate.newHostConnectionPoolHttps[T](to.host, to.port, https.asScala, settings.asScala, log)(materializer) delegate.newHostConnectionPoolHttps[T](to.host, to.port, https.asScala, settings.asScala, log)(materializer)
.mapMaterializedValue(_.toJava) .mapMaterializedValue(_.toJava)
@ -447,7 +468,7 @@ class Http(system: ExtendedActorSystem) extends akka.actor.Extension {
def cachedHostConnectionPool[T](to: ConnectHttp, def cachedHostConnectionPool[T](to: ConnectHttp,
settings: ConnectionPoolSettings, settings: ConnectionPoolSettings,
log: LoggingAdapter, materializer: Materializer): Flow[Pair[HttpRequest, T], Pair[Try[HttpResponse], T], HostConnectionPool] = log: LoggingAdapter, materializer: Materializer): Flow[Pair[HttpRequest, T], Pair[Try[HttpResponse], T], HostConnectionPool] =
adaptTupleFlow(delegate.cachedHostConnectionPoolHttps[T](to.host, to.port, to.effectiveConnectionContext(defaultClientHttpsContext).asScala, settings.asScala, log)(materializer) adaptTupleFlow(delegate.cachedHostConnectionPoolHttps[T](to.host, to.port, to.effectiveHttpsConnectionContext(defaultClientHttpsContext).asScala, settings.asScala, log)(materializer)
.mapMaterializedValue(_.toJava)) .mapMaterializedValue(_.toJava))
/** /**

View file

@ -12,7 +12,6 @@ import akka.http.javadsl.testkit.JUnitRouteTest;
import akka.http.scaladsl.settings.ConnectionPoolSettings; import akka.http.scaladsl.settings.ConnectionPoolSettings;
import akka.japi.Function; import akka.japi.Function;
import akka.stream.javadsl.Flow; import akka.stream.javadsl.Flow;
import scala.concurrent.Future;
import javax.net.ssl.SSLContext; import javax.net.ssl.SSLContext;
@ -24,6 +23,7 @@ import java.util.concurrent.CompletionStage;
@SuppressWarnings("ConstantConditions") @SuppressWarnings("ConstantConditions")
public class HttpAPIsTest extends JUnitRouteTest { public class HttpAPIsTest extends JUnitRouteTest {
@SuppressWarnings("unused")
public void compileOnly() throws Exception { public void compileOnly() throws Exception {
final Http http = Http.get(system()); final Http http = Http.get(system());
@ -98,6 +98,23 @@ public class HttpAPIsTest extends JUnitRouteTest {
http.superPool(conSettings, httpsContext, log, materializer()); http.superPool(conSettings, httpsContext, log, materializer());
final ConnectWithHttps connect = toHostHttps("akka.io", 8081).withCustomHttpsContext(httpsContext).withDefaultHttpsContext(); final ConnectWithHttps connect = toHostHttps("akka.io", 8081).withCustomHttpsContext(httpsContext).withDefaultHttpsContext();
connect.effectiveConnectionContext(http.defaultClientHttpsContext()); // usage by us internally connect.effectiveHttpsConnectionContext(http.defaultClientHttpsContext()); // usage by us internally
}
@SuppressWarnings("unused")
public void compileOnlyBinding() throws Exception {
final Http http = Http.get(system());
final HttpsConnectionContext httpsConnectionContext = null;
http.bind(toHost("127.0.0.1"), materializer()); // 80
http.bind(toHost("127.0.0.1", 8080), materializer()); // 8080
http.bind(toHost("https://127.0.0.1"), materializer()); // HTTPS 443
http.bind(toHost("https://127.0.0.1", 9090), materializer()); // HTTPS 9090
http.bind(toHostHttps("127.0.0.1"), materializer()); // HTTPS 443
http.bind(toHostHttps("127.0.0.1").withCustomHttpsContext(httpsConnectionContext), materializer()); // custom HTTPS 443
http.bind(toHostHttps("http://127.0.0.1"), materializer()); // throws
} }
} }