-htp #19577 replace overloads of bind with HttpConnect

This commit is contained in:
Konrad Malawski 2016-02-15 19:40:18 +01:00
parent 2bb45ce211
commit ef827e6ef1
16 changed files with 190 additions and 206 deletions

View file

@ -42,7 +42,7 @@ trait RouterConfig extends Serializable {
/**
* Create the actual router, responsible for routing messages to routees.
*
*
* @param system the ActorSystem this router belongs to
*/
def createRouter(system: ActorSystem): Router

View file

@ -6,6 +6,8 @@ package docs.http.javadsl.server;
import akka.NotUsed;
import akka.actor.ActorSystem;
import akka.dispatch.OnFailure;
import akka.http.javadsl.ConnectHttp;
import akka.http.javadsl.Http;
import akka.http.javadsl.IncomingConnection;
import akka.http.javadsl.ServerBinding;
@ -38,7 +40,7 @@ public class HttpServerExampleDocTest {
Materializer materializer = ActorMaterializer.create(system);
Source<IncomingConnection, CompletionStage<ServerBinding>> serverSource =
Http.get(system).bind("localhost", 8080, materializer);
Http.get(system).bind(ConnectHttp.toHost("localhost", 8080), materializer);
CompletionStage<ServerBinding> serverBindingFuture =
serverSource.to(Sink.foreach(connection -> {
@ -56,7 +58,7 @@ public class HttpServerExampleDocTest {
Materializer materializer = ActorMaterializer.create(system);
Source<IncomingConnection, CompletionStage<ServerBinding>> serverSource =
Http.get(system).bind("localhost", 80, materializer);
Http.get(system).bind(ConnectHttp.toHost("localhost", 80), materializer);
CompletionStage<ServerBinding> serverBindingFuture =
serverSource.to(Sink.foreach(connection -> {
@ -78,7 +80,7 @@ public class HttpServerExampleDocTest {
Materializer materializer = ActorMaterializer.create(system);
Source<IncomingConnection, CompletionStage<ServerBinding>> serverSource =
Http.get(system).bind("localhost", 8080, materializer);
Http.get(system).bind(ConnectHttp.toHost("localhost", 8080), materializer);
Flow<IncomingConnection, IncomingConnection, NotUsed> failureDetection =
Flow.of(IncomingConnection.class).watchTermination((notUsed, termination) -> {
@ -108,7 +110,7 @@ public class HttpServerExampleDocTest {
Materializer materializer = ActorMaterializer.create(system);
Source<IncomingConnection, CompletionStage<ServerBinding>> serverSource =
Http.get(system).bind("localhost", 8080, materializer);
Http.get(system).bind(ConnectHttp.toHost("localhost", 8080), materializer);
Flow<HttpRequest, HttpRequest, NotUsed> failureDetection =
Flow.of(HttpRequest.class)
@ -151,7 +153,7 @@ public class HttpServerExampleDocTest {
final Materializer materializer = ActorMaterializer.create(system);
Source<IncomingConnection, CompletionStage<ServerBinding>> serverSource =
Http.get(system).bind("localhost", 8080, materializer);
Http.get(system).bind(ConnectHttp.toHost("localhost", 8080), materializer);
//#request-handler
final Function<HttpRequest, HttpResponse> requestHandler =

View file

@ -11,6 +11,7 @@ import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
import akka.NotUsed;
import akka.http.javadsl.ConnectHttp;
import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.duration.FiniteDuration;
@ -57,7 +58,7 @@ public class WebSocketCoreExample {
public HttpResponse apply(HttpRequest request) throws Exception {
return handleRequest(request);
}
}, "localhost", 8080, materializer);
}, ConnectHttp.toHost("localhost", 8080), materializer);
// will throw if binding fails
serverBindingFuture.toCompletableFuture().get(1, TimeUnit.SECONDS);

View file

@ -169,4 +169,23 @@ Previously we had a mix of methods and classes called ``websocket`` or ``Websock
how the word is spelled in the spec and some other places of Akka HTTP.
Methods and classes using the word WebSocket now consistently use it as ``WebSocket``, so updating is as simple as
find-and-replacing the lower-case ``s`` to an upper-case ``S`` wherever the word WebSocket appeared.
find-and-replacing the lower-case ``s`` to an upper-case ``S`` wherever the word WebSocket appeared.
Java DSL for Http binding and connections changed
-------------------------------------------------
In order to minimise the number of needed overloads for each method defined on the ``Http`` extension
a new mini-DSL has been introduced for connecting to hosts given a hostname, port and optional ``ConnectionContext``.
The availability of the connection context (if it's set to ``HttpsConnectionContext``) makes the server be bound
as an HTTPS server, and for outgoing connections those settings are used instead of the default ones if provided.
Was::
http.cachedHostConnectionPool(toHost("akka.io"), materializer());
http.cachedHostConnectionPool("akka.io", 80, httpsConnectionContext, materializer()); // does not work anymore
Replace with::
http.cachedHostConnectionPool(toHostHttps("akka.io", 8081), materializer());
http.cachedHostConnectionPool(toHostHttps("akka.io", 8081).withCustomHttpsContext(httpsContext), materializer());

View file

@ -17,6 +17,12 @@ abstract class ConnectHttp {
final def effectiveHttpsConnectionContext(fallbackContext: HttpsConnectionContext): HttpsConnectionContext =
connectionContext.orElse(fallbackContext)
final def effectiveConnectionContext(fallbackContext: ConnectionContext): ConnectionContext =
if (connectionContext.isPresent) connectionContext.get()
else fallbackContext
override def toString = s"ConnectHttp($host,$port,$isHttps,$connectionContext)"
}
object ConnectHttp {
@ -35,7 +41,8 @@ object ConnectHttp {
def toHost(host: String, port: Int): ConnectHttp = {
require(port > 0, "port must be > 0")
toHost(Uri.create(host).port(port))
val start = if (host.startsWith("http://") || host.startsWith("https://")) host else s"http://$host"
toHost(Uri.create(start).port(port))
}
/**
@ -59,7 +66,8 @@ object ConnectHttp {
@throws(classOf[IllegalArgumentException])
def toHostHttps(host: String, port: Int): ConnectWithHttps = {
require(port > 0, "port must be > 0")
toHostHttps(Uri.create(host).port(port).host.address)
val start = if (host.startsWith("https://")) host else s"https://$host"
toHostHttps(Uri.create(s"$start").port(port))
}
private def effectivePort(uri: Uri): Int = {

View file

@ -83,39 +83,25 @@ class Http(system: ExtendedActorSystem) extends akka.actor.Extension {
/**
* Creates a [[Source]] of [[IncomingConnection]] instances which represents a prospective HTTP server binding
* on the given `endpoint`.
*
* If the given port is 0 the resulting source can be materialized several times. Each materialization will
* then be assigned a new local port by the operating system, which can then be retrieved by the materialized
* [[ServerBinding]].
*
* If the given port is non-zero subsequent materialization attempts of the produced source will immediately
* fail, unless the first materialization has already been unbound. Unbinding can be triggered via the materialized
* [[ServerBinding]].
*
* The server will be bound using HTTPS if the [[ConnectHttp]] object is configured with an [[HttpsConnectionContext]],
* or the [[defaultServerHttpContext]] has been configured to be an [[HttpsConnectionContext]].
*/
def bind(connect: ConnectHttp, materializer: Materializer): Source[IncomingConnection, CompletionStage[ServerBinding]] = {
val connectionContext =
if (connect.connectionContext.isPresent) connect.connectionContext.get()
else defaultServerHttpContext
new Source(delegate.bind(connect.host, connect.port, connectionContext.asScala)(materializer)
val connectionContext = connect.effectiveConnectionContext(defaultServerHttpContext).asScala
new Source(delegate.bind(connect.host, connect.port, connectionContext)(materializer)
.map(new IncomingConnection(_))
.mapMaterializedValue(_.map(new ServerBinding(_))(ec).toJava))
}
/**
* Creates a [[Source]] of [[IncomingConnection]] instances which represents a prospective HTTP server binding
* on the given `endpoint`.
* If the given port is 0 the resulting source can be materialized several times. Each materialization will
* then be assigned a new local port by the operating system, which can then be retrieved by the materialized
* [[ServerBinding]].
* If the given port is non-zero subsequent materialization attempts of the produced source will immediately
* fail, unless the first materialization has already been unbound. Unbinding can be triggered via the materialized
* [[ServerBinding]].
*/
def bind(interface: String, port: Int, materializer: Materializer): Source[IncomingConnection, CompletionStage[ServerBinding]] =
new Source(delegate.bind(interface, port)(materializer)
.map(new IncomingConnection(_))
.mapMaterializedValue(_.map(new ServerBinding(_))(ec).toJava))
/**
* Creates a [[Source]] of [[IncomingConnection]] instances which represents a prospective HTTP server binding
* on the given `endpoint`.
@ -127,14 +113,18 @@ class Http(system: ExtendedActorSystem) extends akka.actor.Extension {
* If the given port is non-zero subsequent materialization attempts of the produced source will immediately
* fail, unless the first materialization has already been unbound. Unbinding can be triggered via the materialized
* [[ServerBinding]].
*
* The server will be bound using HTTPS if the [[ConnectHttp]] object is configured with an [[HttpsConnectionContext]],
* or the [[defaultServerHttpContext]] has been configured to be an [[HttpsConnectionContext]].
*/
def bind(interface: String, port: Int,
connectionContext: ConnectionContext,
def bind(connect: ConnectHttp,
settings: ServerSettings,
materializer: Materializer): Source[IncomingConnection, CompletionStage[ServerBinding]] =
new Source(delegate.bind(interface, port, settings = settings.asScala, connectionContext = ConnectionContext.noEncryption().asScala)(materializer)
materializer: Materializer): Source[IncomingConnection, CompletionStage[ServerBinding]] = {
val connectionContext = connect.effectiveConnectionContext(defaultServerHttpContext).asScala
new Source(delegate.bind(connect.host, connect.port, settings = settings.asScala, connectionContext = connectionContext)(materializer)
.map(new IncomingConnection(_))
.mapMaterializedValue(_.map(new ServerBinding(_))(ec).toJava))
}
/**
* Creates a [[Source]] of [[IncomingConnection]] instances which represents a prospective HTTP server binding
@ -147,34 +137,19 @@ class Http(system: ExtendedActorSystem) extends akka.actor.Extension {
* If the given port is non-zero subsequent materialization attempts of the produced source will immediately
* fail, unless the first materialization has already been unbound. Unbinding can be triggered via the materialized
* [[ServerBinding]].
*/
def bind(interface: String, port: Int,
connectionContext: ConnectionContext,
materializer: Materializer): Source[IncomingConnection, CompletionStage[ServerBinding]] =
new Source(delegate.bind(interface, port, connectionContext = connectionContext.asScala)(materializer)
.map(new IncomingConnection(_))
.mapMaterializedValue(_.map(new ServerBinding(_))(ec).toJava))
/**
* Creates a [[Source]] of [[IncomingConnection]] instances which represents a prospective HTTP server binding
* on the given `endpoint`.
*
* If the given port is 0 the resulting source can be materialized several times. Each materialization will
* then be assigned a new local port by the operating system, which can then be retrieved by the materialized
* [[ServerBinding]].
*
* If the given port is non-zero subsequent materialization attempts of the produced source will immediately
* fail, unless the first materialization has already been unbound. Unbinding can be triggered via the materialized
* [[ServerBinding]].
* The server will be bound using HTTPS if the [[ConnectHttp]] object is configured with an [[HttpsConnectionContext]],
* or the [[defaultServerHttpContext]] has been configured to be an [[HttpsConnectionContext]].
*/
def bind(interface: String, port: Int,
connectionContext: ConnectionContext,
def bind(connect: ConnectHttp,
settings: ServerSettings,
log: LoggingAdapter,
materializer: Materializer): Source[IncomingConnection, CompletionStage[ServerBinding]] =
new Source(delegate.bind(interface, port, ConnectionContext.noEncryption().asScala, settings.asScala, log)(materializer)
materializer: Materializer): Source[IncomingConnection, CompletionStage[ServerBinding]] = {
val connectionContext = connect.effectiveConnectionContext(defaultServerHttpContext).asScala
new Source(delegate.bind(connect.host, connect.port, connectionContext, settings.asScala, log)(materializer)
.map(new IncomingConnection(_))
.mapMaterializedValue(_.map(new ServerBinding(_))(ec).toJava))
}
/**
* Convenience method which starts a new HTTP server at the given endpoint and uses the given `handler`
@ -182,13 +157,18 @@ class Http(system: ExtendedActorSystem) extends akka.actor.Extension {
*
* The number of concurrently accepted connections can be configured by overriding
* the `akka.http.server.max-connections` setting.
*
* The server will be bound using HTTPS if the [[ConnectHttp]] object is configured with an [[HttpsConnectionContext]],
* or the [[defaultServerHttpContext]] has been configured to be an [[HttpsConnectionContext]].
*/
def bindAndHandle(handler: Flow[HttpRequest, HttpResponse, _],
interface: String, port: Int,
materializer: Materializer): CompletionStage[ServerBinding] =
connect: ConnectHttp,
materializer: Materializer): CompletionStage[ServerBinding] = {
val connectionContext = connect.effectiveConnectionContext(defaultServerHttpContext).asScala
delegate.bindAndHandle(handler.asInstanceOf[Flow[sm.HttpRequest, sm.HttpResponse, _]].asScala,
interface, port)(materializer)
connect.host, connect.port, connectionContext)(materializer)
.map(new ServerBinding(_))(ec).toJava
}
/**
* Convenience method which starts a new HTTP server at the given endpoint and uses the given `handler`
@ -196,31 +176,20 @@ class Http(system: ExtendedActorSystem) extends akka.actor.Extension {
*
* The number of concurrently accepted connections can be configured by overriding
* the `akka.http.server.max-connections` setting.
*/
def bindAndHandle(handler: Flow[HttpRequest, HttpResponse, _],
interface: String, port: Int,
connectionContext: ConnectionContext,
materializer: Materializer): CompletionStage[ServerBinding] =
delegate.bindAndHandle(handler.asInstanceOf[Flow[sm.HttpRequest, sm.HttpResponse, _]].asScala,
interface, port, connectionContext.asScala)(materializer)
.map(new ServerBinding(_))(ec).toJava
/**
* Convenience method which starts a new HTTP server at the given endpoint and uses the given `handler`
* [[Flow]] for processing all incoming connections.
*
* The number of concurrently accepted connections can be configured by overriding
* the `akka.http.server.max-connections` setting.
* The server will be bound using HTTPS if the [[ConnectHttp]] object is configured with an [[HttpsConnectionContext]],
* or the [[defaultServerHttpContext]] has been configured to be an [[HttpsConnectionContext]].
*/
def bindAndHandle(handler: Flow[HttpRequest, HttpResponse, _],
interface: String, port: Int,
connect: ConnectHttp,
settings: ServerSettings,
connectionContext: ConnectionContext,
log: LoggingAdapter,
materializer: Materializer): CompletionStage[ServerBinding] =
materializer: Materializer): CompletionStage[ServerBinding] = {
val connectionContext = connect.effectiveConnectionContext(defaultServerHttpContext).asScala
delegate.bindAndHandle(handler.asInstanceOf[Flow[sm.HttpRequest, sm.HttpResponse, _]].asScala,
interface, port, connectionContext.asScala, settings.asScala, log)(materializer)
connect.host, connect.port, connectionContext, settings.asScala, log)(materializer)
.map(new ServerBinding(_))(ec).toJava
}
/**
* Convenience method which starts a new HTTP server at the given endpoint and uses the given `handler`
@ -228,12 +197,17 @@ class Http(system: ExtendedActorSystem) extends akka.actor.Extension {
*
* The number of concurrently accepted connections can be configured by overriding
* the `akka.http.server.max-connections` setting.
*
* The server will be bound using HTTPS if the [[ConnectHttp]] object is configured with an [[HttpsConnectionContext]],
* or the [[defaultServerHttpContext]] has been configured to be an [[HttpsConnectionContext]].
*/
def bindAndHandleSync(handler: Function[HttpRequest, HttpResponse],
interface: String, port: Int,
materializer: Materializer): CompletionStage[ServerBinding] =
delegate.bindAndHandleSync(handler.apply(_).asScala, interface, port)(materializer)
connect: ConnectHttp,
materializer: Materializer): CompletionStage[ServerBinding] = {
val connectionContext = connect.effectiveConnectionContext(defaultServerHttpContext).asScala
delegate.bindAndHandleSync(handler.apply(_).asScala, connect.host, connect.port, connectionContext)(materializer)
.map(new ServerBinding(_))(ec).toJava
}
/**
* Convenience method which starts a new HTTP server at the given endpoint and uses the given `handler`
@ -241,30 +215,20 @@ class Http(system: ExtendedActorSystem) extends akka.actor.Extension {
*
* The number of concurrently accepted connections can be configured by overriding
* the `akka.http.server.max-connections` setting.
*/
def bindAndHandleSync(handler: Function[HttpRequest, HttpResponse],
interface: String, port: Int,
connectionContext: ConnectionContext,
materializer: Materializer): CompletionStage[ServerBinding] =
delegate.bindAndHandleSync(handler.apply(_).asScala, interface, port, connectionContext.asScala)(materializer)
.map(new ServerBinding(_))(ec).toJava
/**
* Convenience method which starts a new HTTP server at the given endpoint and uses the given `handler`
* [[Flow]] for processing all incoming connections.
*
* The number of concurrently accepted connections can be configured by overriding
* the `akka.http.server.max-connections` setting.
* The server will be bound using HTTPS if the [[ConnectHttp]] object is configured with an [[HttpsConnectionContext]],
* or the [[defaultServerHttpContext]] has been configured to be an [[HttpsConnectionContext]].
*/
def bindAndHandleSync(handler: Function[HttpRequest, HttpResponse],
interface: String, port: Int,
connect: ConnectHttp,
settings: ServerSettings,
connectionContext: ConnectionContext,
log: LoggingAdapter,
materializer: Materializer): CompletionStage[ServerBinding] =
materializer: Materializer): CompletionStage[ServerBinding] = {
val connectionContext = connect.effectiveConnectionContext(defaultServerHttpContext).asScala
delegate.bindAndHandleSync(handler.apply(_).asScala,
interface, port, connectionContext.asScala, settings.asScala, log)(materializer)
connect.host, connect.port, connectionContext, settings.asScala, log)(materializer)
.map(new ServerBinding(_))(ec).toJava
}
/**
* Convenience method which starts a new HTTP server at the given endpoint and uses the given `handler`
@ -272,12 +236,17 @@ class Http(system: ExtendedActorSystem) extends akka.actor.Extension {
*
* The number of concurrently accepted connections can be configured by overriding
* the `akka.http.server.max-connections` setting.
*
* The server will be bound using HTTPS if the [[ConnectHttp]] object is configured with an [[HttpsConnectionContext]],
* or the [[defaultServerHttpContext]] has been configured to be an [[HttpsConnectionContext]].
*/
def bindAndHandleAsync(handler: Function[HttpRequest, CompletionStage[HttpResponse]],
interface: String, port: Int,
materializer: Materializer): CompletionStage[ServerBinding] =
delegate.bindAndHandleAsync(handler.apply(_).toScala, interface, port)(materializer)
connect: ConnectHttp,
materializer: Materializer): CompletionStage[ServerBinding] = {
val connectionContext = connect.effectiveConnectionContext(defaultServerHttpContext).asScala
delegate.bindAndHandleAsync(handler.apply(_).toScala, connect.host, connect.port, connectionContext)(materializer)
.map(new ServerBinding(_))(ec).toJava
}
/**
* Convenience method which starts a new HTTP server at the given endpoint and uses the given `handler`
@ -285,29 +254,20 @@ class Http(system: ExtendedActorSystem) extends akka.actor.Extension {
*
* The number of concurrently accepted connections can be configured by overriding
* the `akka.http.server.max-connections` setting.
*/
def bindAndHandleAsync(handler: Function[HttpRequest, CompletionStage[HttpResponse]],
interface: String, port: Int,
connectionContext: ConnectionContext,
materializer: Materializer): CompletionStage[ServerBinding] =
delegate.bindAndHandleAsync(handler.apply(_).toScala, interface, port, connectionContext.asScala)(materializer)
.map(new ServerBinding(_))(ec).toJava
/**
* Convenience method which starts a new HTTP server at the given endpoint and uses the given `handler`
* [[Flow]] for processing all incoming connections.
*
* The number of concurrently accepted connections can be configured by overriding
* the `akka.http.server.max-connections` setting.
* The server will be bound using HTTPS if the [[ConnectHttp]] object is configured with an [[HttpsConnectionContext]],
* or the [[defaultServerHttpContext]] has been configured to be an [[HttpsConnectionContext]].
*/
def bindAndHandleAsync(handler: Function[HttpRequest, CompletionStage[HttpResponse]],
interface: String, port: Int,
settings: ServerSettings, connectionContext: ConnectionContext,
connect: ConnectHttp,
settings: ServerSettings,
parallelism: Int, log: LoggingAdapter,
materializer: Materializer): CompletionStage[ServerBinding] =
materializer: Materializer): CompletionStage[ServerBinding] = {
val connectionContext = connect.effectiveConnectionContext(defaultServerHttpContext).asScala
delegate.bindAndHandleAsync(handler.apply(_).toScala,
interface, port, connectionContext.asScala, settings.asScala, parallelism, log)(materializer)
connect.host, connect.port, connectionContext, settings.asScala, parallelism, log)(materializer)
.map(new ServerBinding(_))(ec).toJava
}
/**
* Constructs a client layer stage using the configured default [[akka.http.javadsl.settings.ClientConnectionSettings]].
@ -355,16 +315,15 @@ class Http(system: ExtendedActorSystem) extends akka.actor.Extension {
* Creates a [[Flow]] representing a prospective HTTP client connection to the given endpoint.
* Every materialization of the produced flow will attempt to establish a new outgoing connection.
*/
def outgoingConnection(host: String, port: Int,
connectionContext: ConnectionContext,
def outgoingConnection(to: ConnectHttp,
localAddress: Optional[InetSocketAddress],
settings: ClientConnectionSettings,
log: LoggingAdapter): Flow[HttpRequest, HttpResponse, CompletionStage[OutgoingConnection]] =
adaptOutgoingFlow {
connectionContext match {
case https: HttpsConnectionContext delegate.outgoingConnectionHttps(host, port, https.asScala, localAddress.asScala, settings.asScala, log)
case _ delegate.outgoingConnection(host, port, localAddress.asScala, settings.asScala, log)
}
if (to.isHttps)
delegate.outgoingConnectionHttps(to.host, to.port, to.effectiveConnectionContext(defaultClientHttpsContext).asInstanceOf[HttpsConnectionContext].asScala, localAddress.asScala, settings.asScala, log)
else
delegate.outgoingConnection(to.host, to.port, localAddress.asScala, settings.asScala, log)
}
/**
@ -414,7 +373,7 @@ class Http(system: ExtendedActorSystem) extends akka.actor.Extension {
case https: HttpsConnectionContext
delegate.newHostConnectionPoolHttps[T](to.host, to.port, https.asScala, settings.asScala, log)(materializer)
.mapMaterializedValue(_.toJava)
case _
case _
delegate.newHostConnectionPool[T](to.host, to.port, settings.asScala, log)(materializer)
.mapMaterializedValue(_.toJava)
}
@ -682,7 +641,7 @@ class Http(system: ExtendedActorSystem) extends akka.actor.Extension {
/**
* Gets the default
*
*
* @return
*/
def defaultServerHttpContext: ConnectionContext =

View file

@ -896,7 +896,7 @@ final case class `WWW-Authenticate`(challenges: immutable.Seq[HttpChallenge]) ex
}
// http://en.wikipedia.org/wiki/X-Forwarded-For
object `X-Forwarded-For` extends ModeledCompanion[`X-Forwarded-For`] {
object `X-Forwarded-For` extends ModeledCompanion[`X-Forwarded-For`] {
def apply(first: RemoteAddress, more: RemoteAddress*): `X-Forwarded-For` = apply(immutable.Seq(first +: more: _*))
implicit val addressesRenderer = Renderer.defaultSeqRenderer[RemoteAddress] // cache
}
@ -911,9 +911,9 @@ final case class `X-Forwarded-For`(addresses: immutable.Seq[RemoteAddress]) exte
def getAddresses: Iterable[jm.RemoteAddress] = addresses.asJava
}
object `X-Real-Ip` extends ModeledCompanion[`X-Real-Ip`]
final case class `X-Real-Ip`(address:RemoteAddress) extends jm.headers.XRealIp
object `X-Real-Ip` extends ModeledCompanion[`X-Real-Ip`]
final case class `X-Real-Ip`(address: RemoteAddress) extends jm.headers.XRealIp
with RequestHeader {
def renderValue[R <: Rendering](r: R): r.type = r ~~ address
protected def companion = `X-Real-Ip`
protected def companion = `X-Real-Ip`
}

View file

@ -6,9 +6,9 @@ package akka.http.javadsl.model;
import akka.NotUsed;
import akka.actor.ActorSystem;
import akka.http.javadsl.ConnectHttp;
import akka.http.javadsl.Http;
import akka.http.javadsl.ServerBinding;
import akka.japi.Function;
import akka.http.javadsl.model.ws.Message;
import akka.http.javadsl.model.ws.TextMessage;
import akka.http.javadsl.model.ws.WebSocket;
@ -17,9 +17,6 @@ import akka.stream.ActorMaterializer;
import akka.stream.Materializer;
import akka.stream.javadsl.Flow;
import akka.stream.javadsl.Source;
import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.duration.FiniteDuration;
import java.io.BufferedReader;
import java.io.InputStreamReader;
@ -35,18 +32,16 @@ public class JavaTestServer {
CompletionStage<ServerBinding> serverBindingFuture =
Http.get(system).bindAndHandleSync(
new Function<HttpRequest, HttpResponse>() {
public HttpResponse apply(HttpRequest request) throws Exception {
System.out.println("Handling request to " + request.getUri());
request -> {
System.out.println("Handling request to " + request.getUri());
if (request.getUri().path().equals("/"))
return WebSocket.handleWebSocketRequestWith(request, echoMessages());
else if (request.getUri().path().equals("/greeter"))
return WebSocket.handleWebSocketRequestWith(request, greeter());
else
return JavaApiTestCases.handleRequest(request);
}
}, "localhost", 8080, materializer);
if (request.getUri().path().equals("/"))
return WebSocket.handleWebSocketRequestWith(request, echoMessages());
else if (request.getUri().path().equals("/greeter"))
return WebSocket.handleWebSocketRequestWith(request, greeter());
else
return JavaApiTestCases.handleRequest(request);
}, ConnectHttp.toHost("localhost", 8080), materializer);
serverBindingFuture.toCompletableFuture().get(1, TimeUnit.SECONDS); // will throw if binding fails
System.out.println("Press ENTER to stop.");

View file

@ -9,6 +9,7 @@ import java.util.concurrent.{ CompletionStage, TimeUnit, CompletableFuture }
import akka.NotUsed
import akka.http.impl.util.JavaMapping.HttpsConnectionContext
import akka.http.javadsl.ConnectHttp._
import akka.http.javadsl.model.ws._
import akka.http.javadsl.settings.{ ClientConnectionSettings, ConnectionPoolSettings, ServerSettings }
import akka.japi.Pair
@ -72,7 +73,7 @@ class HttpExtensionApiSpec extends WordSpec with Matchers with BeforeAndAfterAll
"properly bind a server (with three parameters)" in {
val (_, host, port) = TestUtils.temporaryServerHostnameAndPort()
val probe = TestSubscriber.manualProbe[IncomingConnection]()
val binding = http.bind(host, port, materializer)
val binding = http.bind(toHost(host, port), materializer)
.toMat(Sink.fromSubscriber(probe), Keep.left)
.run(materializer)
val sub = probe.expectSubscription()
@ -83,7 +84,7 @@ class HttpExtensionApiSpec extends WordSpec with Matchers with BeforeAndAfterAll
"properly bind a server (with four parameters)" in {
val (_, host, port) = TestUtils.temporaryServerHostnameAndPort()
val probe = TestSubscriber.manualProbe[IncomingConnection]()
val binding = http.bind(host, port, connectionContext, materializer)
val binding = http.bind(toHost(host, port), materializer)
.toMat(Sink.fromSubscriber(probe), Keep.left)
.run(materializer)
val sub = probe.expectSubscription()
@ -94,7 +95,7 @@ class HttpExtensionApiSpec extends WordSpec with Matchers with BeforeAndAfterAll
"properly bind a server (with five parameters)" in {
val (_, host, port) = TestUtils.temporaryServerHostnameAndPort()
val probe = TestSubscriber.manualProbe[IncomingConnection]()
val binding = http.bind(host, port, connectionContext, serverSettings, materializer)
val binding = http.bind(toHost(host, port), serverSettings, materializer)
.toMat(Sink.fromSubscriber(probe), Keep.left)
.run(materializer)
val sub = probe.expectSubscription()
@ -105,7 +106,7 @@ class HttpExtensionApiSpec extends WordSpec with Matchers with BeforeAndAfterAll
"properly bind a server (with six parameters)" in {
val (_, host, port) = TestUtils.temporaryServerHostnameAndPort()
val probe = TestSubscriber.manualProbe[IncomingConnection]()
val binding = http.bind(host, port, connectionContext, serverSettings, loggingAdapter, materializer)
val binding = http.bind(toHost(host, port), serverSettings, loggingAdapter, materializer)
.toMat(Sink.fromSubscriber(probe), Keep.left)
.run(materializer)
val sub = probe.expectSubscription()
@ -119,9 +120,9 @@ class HttpExtensionApiSpec extends WordSpec with Matchers with BeforeAndAfterAll
val flow: Flow[HttpRequest, HttpResponse, NotUsed] = akka.stream.scaladsl.Flow[HttpRequest]
.map(req HttpResponse.create())
.asJava
val binding = http.bindAndHandle(flow, host, port, materializer)
val binding = http.bindAndHandle(flow, toHost(host, port), materializer)
val (_, completion) = http.outgoingConnection(ConnectHttp.toHost(host, port))
val (_, completion) = http.outgoingConnection(toHost(host, port))
.runWith(Source.single(HttpRequest.create("/abc")), Sink.head(), materializer).toScala
waitFor(completion)
@ -133,9 +134,9 @@ class HttpExtensionApiSpec extends WordSpec with Matchers with BeforeAndAfterAll
val flow: Flow[HttpRequest, HttpResponse, NotUsed] = akka.stream.scaladsl.Flow[HttpRequest]
.map(req HttpResponse.create())
.asJava
val binding = http.bindAndHandle(flow, host, port, connectionContext, materializer)
val binding = http.bindAndHandle(flow, toHost(host, port), materializer)
val (_, completion) = http.outgoingConnection(ConnectHttp.toHost(host, port))
val (_, completion) = http.outgoingConnection(toHost(host, port))
.runWith(Source.single(HttpRequest.create("/abc")), Sink.head(), materializer).toScala
waitFor(completion)
@ -147,9 +148,9 @@ class HttpExtensionApiSpec extends WordSpec with Matchers with BeforeAndAfterAll
val flow: Flow[HttpRequest, HttpResponse, NotUsed] = akka.stream.scaladsl.Flow[HttpRequest]
.map(req HttpResponse.create())
.asJava
val binding = http.bindAndHandle(flow, host, port, serverSettings, connectionContext, loggingAdapter, materializer)
val binding = http.bindAndHandle(flow, toHost(host, port), serverSettings, loggingAdapter, materializer)
val (_, completion) = http.outgoingConnection(ConnectHttp.toHost(host, port))
val (_, completion) = http.outgoingConnection(toHost(host, port))
.runWith(Source.single(HttpRequest.create("/abc")), Sink.head(), materializer).toScala
waitFor(completion)
@ -158,7 +159,7 @@ class HttpExtensionApiSpec extends WordSpec with Matchers with BeforeAndAfterAll
"properly bind and handle a server with a synchronous function (with four parameters)" in {
val (_, host, port) = TestUtils.temporaryServerHostnameAndPort()
val binding = http.bindAndHandleSync(httpSuccessFunction, host, port, materializer)
val binding = http.bindAndHandleSync(httpSuccessFunction, toHost(host, port), materializer)
val response = http.singleRequest(HttpRequest.create(s"http://$host:$port/").withMethod(HttpMethods.GET), materializer)
@ -168,7 +169,7 @@ class HttpExtensionApiSpec extends WordSpec with Matchers with BeforeAndAfterAll
"properly bind and handle a server with a synchronous function (with five parameters)" in {
val (_, host, port) = TestUtils.temporaryServerHostnameAndPort()
val binding = http.bindAndHandleSync(httpSuccessFunction, host, port, connectionContext, materializer)
val binding = http.bindAndHandleSync(httpSuccessFunction, toHost(host, port), materializer)
val response = http.singleRequest(HttpRequest.create(s"http://$host:$port/").withMethod(HttpMethods.GET), materializer)
@ -178,7 +179,7 @@ class HttpExtensionApiSpec extends WordSpec with Matchers with BeforeAndAfterAll
"properly bind and handle a server with a synchronous (with seven parameters)" in {
val (_, host, port) = TestUtils.temporaryServerHostnameAndPort()
val binding = http.bindAndHandleSync(httpSuccessFunction, host, port, serverSettings, connectionContext, loggingAdapter, materializer)
val binding = http.bindAndHandleSync(httpSuccessFunction, toHost(host, port), serverSettings, loggingAdapter, materializer)
val response = http.singleRequest(HttpRequest.create(s"http://$host:$port/").withMethod(HttpMethods.GET), materializer)
@ -188,7 +189,7 @@ class HttpExtensionApiSpec extends WordSpec with Matchers with BeforeAndAfterAll
"properly bind and handle a server with an asynchronous function (with four parameters)" in {
val (_, host, port) = TestUtils.temporaryServerHostnameAndPort()
val binding = http.bindAndHandleAsync(asyncHttpSuccessFunction, host, port, materializer)
val binding = http.bindAndHandleAsync(asyncHttpSuccessFunction, toHost(host, port), materializer)
val response = http.singleRequest(HttpRequest.create(s"http://$host:$port/").withMethod(HttpMethods.GET), materializer)
@ -198,7 +199,7 @@ class HttpExtensionApiSpec extends WordSpec with Matchers with BeforeAndAfterAll
"properly bind and handle a server with an asynchronous function (with five parameters)" in {
val (_, host, port) = TestUtils.temporaryServerHostnameAndPort()
val binding = http.bindAndHandleAsync(asyncHttpSuccessFunction, host, port, connectionContext, materializer)
val binding = http.bindAndHandleAsync(asyncHttpSuccessFunction, toHost(host, port), materializer)
val response = http.singleRequest(HttpRequest.create(s"http://$host:$port/").withMethod(HttpMethods.GET), materializer)
@ -208,7 +209,7 @@ class HttpExtensionApiSpec extends WordSpec with Matchers with BeforeAndAfterAll
"properly bind and handle a server with an asynchronous function (with eight parameters)" in {
val (_, host, port) = TestUtils.temporaryServerHostnameAndPort()
val binding = http.bindAndHandleAsync(asyncHttpSuccessFunction, host, port, serverSettings, connectionContext, 1, loggingAdapter, materializer)
val binding = http.bindAndHandleAsync(asyncHttpSuccessFunction, toHost(host, port), serverSettings, 1, loggingAdapter, materializer)
val response = http.singleRequest(HttpRequest.create(s"http://$host:$port/").withMethod(HttpMethods.GET), materializer)
@ -236,7 +237,7 @@ class HttpExtensionApiSpec extends WordSpec with Matchers with BeforeAndAfterAll
val (host, port, binding) = runServer()
val poolFlow: Flow[Pair[HttpRequest, NotUsed], Pair[Try[HttpResponse], NotUsed], HostConnectionPool] =
http.cachedHostConnectionPool[NotUsed](ConnectHttp.toHost(host, port), materializer)
http.cachedHostConnectionPool[NotUsed](toHost(host, port), materializer)
val pair: Pair[HostConnectionPool, CompletionStage[Pair[Try[HttpResponse], NotUsed]]] =
Source.single(new Pair(HttpRequest.GET(s"http://$host:$port/"), NotUsed.getInstance()))
@ -256,7 +257,7 @@ class HttpExtensionApiSpec extends WordSpec with Matchers with BeforeAndAfterAll
val (host, port, binding) = runServer()
val poolFlow: Flow[Pair[HttpRequest, NotUsed], Pair[Try[HttpResponse], NotUsed], HostConnectionPool] =
http.cachedHostConnectionPool[NotUsed](ConnectHttp.toHost(host, port), poolSettings, loggingAdapter, materializer)
http.cachedHostConnectionPool[NotUsed](toHost(host, port), poolSettings, loggingAdapter, materializer)
val pair: Pair[HostConnectionPool, CompletionStage[Pair[Try[HttpResponse], NotUsed]]] =
Source.single(new Pair(HttpRequest.GET(s"http://$host:$port/"), NotUsed.getInstance()))
@ -291,7 +292,7 @@ class HttpExtensionApiSpec extends WordSpec with Matchers with BeforeAndAfterAll
"create a host connection pool (with a ConnectHttp and a Materializer)" in {
val (host, port, binding) = runServer()
val poolFlow = http.newHostConnectionPool[NotUsed](ConnectHttp.toHost(host, port), materializer)
val poolFlow = http.newHostConnectionPool[NotUsed](toHost(host, port), materializer)
val pair: Pair[HostConnectionPool, CompletionStage[Pair[Try[HttpResponse], NotUsed]]] =
Source.single(new Pair(get(host, port), NotUsed.getInstance()))
@ -311,7 +312,7 @@ class HttpExtensionApiSpec extends WordSpec with Matchers with BeforeAndAfterAll
pending
val (host, port, binding) = runServer()
val poolFlow = http.newHostConnectionPool[NotUsed](ConnectHttp.toHost(host, port), poolSettings, loggingAdapter, materializer)
val poolFlow = http.newHostConnectionPool[NotUsed](toHost(host, port), poolSettings, loggingAdapter, materializer)
val pair: Pair[HostConnectionPool, CompletionStage[Pair[Try[HttpResponse], NotUsed]]] =
Source.single(new Pair(get(host, port), NotUsed.getInstance()))
@ -369,7 +370,7 @@ class HttpExtensionApiSpec extends WordSpec with Matchers with BeforeAndAfterAll
"create an outgoing connection (with a ConnectHttp)" in {
val (host, port, binding) = runServer()
val flow = http.outgoingConnection(ConnectHttp.toHost(host, port))
val flow = http.outgoingConnection(toHost(host, port))
val response = Source.single(get(host, port))
.via(flow)
@ -382,10 +383,9 @@ class HttpExtensionApiSpec extends WordSpec with Matchers with BeforeAndAfterAll
"create an outgoing connection (with 6 parameters)" in {
val (host, port, binding) = runServer()
println("host = " + host)
val flow = http.outgoingConnection(
host,
port,
connectionContext,
toHost(host, port),
Optional.empty(),
ClientConnectionSettings.create(system),
NoLogging)
@ -455,7 +455,7 @@ class HttpExtensionApiSpec extends WordSpec with Matchers with BeforeAndAfterAll
def runServer(): (Host, Port, ServerBinding) = {
val (_, host, port) = TestUtils.temporaryServerHostnameAndPort()
val server = http.bindAndHandleSync(httpSuccessFunction, host, port, materializer)
val server = http.bindAndHandleSync(httpSuccessFunction, toHost(host, port), materializer)
(host, port, waitFor(server))
}
@ -467,7 +467,7 @@ class HttpExtensionApiSpec extends WordSpec with Matchers with BeforeAndAfterAll
override def apply(request: HttpRequest): HttpResponse = {
WebSocket.handleWebSocketRequestWith(request, Flow.create[Message]())
}
}, host, port, materializer)
}, toHost(host, port), materializer)
(host, port, waitFor(server))
}

View file

@ -36,22 +36,22 @@ public class HttpAPIsTest extends JUnitRouteTest {
ConnectionPoolSettings conSettings = null;
LoggingAdapter log = null;
http.bind("127.0.0.1", 8080, materializer());
http.bind("127.0.0.1", 8080, connectionContext, materializer());
http.bind("127.0.0.1", 8080, httpContext, materializer());
http.bind("127.0.0.1", 8080, httpsContext, materializer());
http.bind(toHost("127.0.0.1", 8080), materializer());
http.bind(toHost("127.0.0.1", 8080), materializer());
http.bind(toHostHttps("127.0.0.1", 8080), materializer());
final Flow<HttpRequest, HttpResponse, ?> handler = null;
http.bindAndHandle(handler, "127.0.0.1", 8080, materializer());
http.bindAndHandle(handler, "127.0.0.1", 8080, httpsContext, materializer());
http.bindAndHandle(handler, toHost("127.0.0.1", 8080), materializer());
http.bindAndHandle(handler, toHost("127.0.0.1", 8080), materializer());
http.bindAndHandle(handler, toHostHttps("127.0.0.1", 8080).withCustomHttpsContext(httpsContext), materializer());
final Function<HttpRequest, CompletionStage<HttpResponse>> handler1 = null;
http.bindAndHandleAsync(handler1, "127.0.0.1", 8080, materializer());
http.bindAndHandleAsync(handler1, "127.0.0.1", 8080, httpsContext, materializer());
http.bindAndHandleAsync(handler1, toHost("127.0.0.1", 8080), materializer());
http.bindAndHandleAsync(handler1, toHostHttps("127.0.0.1", 8080), materializer());
final Function<HttpRequest, HttpResponse> handler2 = null;
http.bindAndHandleSync(handler2, "127.0.0.1", 8080, materializer());
http.bindAndHandleSync(handler2, "127.0.0.1", 8080, httpsContext, materializer());
http.bindAndHandleSync(handler2, toHost("127.0.0.1", 8080), materializer());
http.bindAndHandleSync(handler2, toHostHttps("127.0.0.1", 8080), materializer());
final HttpRequest handler3 = null;
http.singleRequest(handler3, materializer());

View file

@ -363,12 +363,12 @@ object StreamLayout {
}
final case class CompositeModule(
override val subModules: Set[Module],
override val shape: Shape,
override val downstreams: Map[OutPort, InPort],
override val upstreams: Map[InPort, OutPort],
override val materializedValueComputation: MaterializedValueNode,
override val attributes: Attributes) extends Module {
override val subModules: Set[Module],
override val shape: Shape,
override val downstreams: Map[OutPort, InPort],
override val upstreams: Map[InPort, OutPort],
override val materializedValueComputation: MaterializedValueNode,
override val attributes: Attributes) extends Module {
override def replaceShape(s: Shape): Module =
if (s != shape) {
@ -395,13 +395,13 @@ object StreamLayout {
}
final case class FusedModule(
override val subModules: Set[Module],
override val shape: Shape,
override val downstreams: Map[OutPort, InPort],
override val upstreams: Map[InPort, OutPort],
override val materializedValueComputation: MaterializedValueNode,
override val attributes: Attributes,
info: Fusing.StructuralInfo) extends Module {
override val subModules: Set[Module],
override val shape: Shape,
override val downstreams: Map[OutPort, InPort],
override val upstreams: Map[InPort, OutPort],
override val materializedValueComputation: MaterializedValueNode,
override val attributes: Attributes,
info: Fusing.StructuralInfo) extends Module {
override def isFused: Boolean = true

View file

@ -347,7 +347,7 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends
* @see [[#mapAsyncUnordered]]
*/
def mapAsync[T](parallelism: Int, f: function.Function[Out, CompletionStage[T]]): javadsl.Flow[In, T, Mat] =
new Flow(delegate.mapAsync(parallelism)(x => f(x).toScala))
new Flow(delegate.mapAsync(parallelism)(x f(x).toScala))
/**
* Transform this stream by applying the given function to each of the elements
@ -379,7 +379,7 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends
* @see [[#mapAsync]]
*/
def mapAsyncUnordered[T](parallelism: Int, f: function.Function[Out, CompletionStage[T]]): javadsl.Flow[In, T, Mat] =
new Flow(delegate.mapAsyncUnordered(parallelism)(x => f(x).toScala))
new Flow(delegate.mapAsyncUnordered(parallelism)(x f(x).toScala))
/**
* Only pass on those elements that satisfy the given predicate.
@ -1461,7 +1461,7 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends
def zipMat[T, M, M2](that: Graph[SourceShape[T], M],
matF: function.Function2[Mat, M, M2]): javadsl.Flow[In, Out @uncheckedVariance Pair T, M2] =
this.viaMat(Flow.fromGraph(GraphDSL.create(that,
new function.Function2[GraphDSL.Builder[M], SourceShape[T], FlowShape[Out, Out @uncheckedVariance Pair T]] {
new function.Function2[GraphDSL.Builder[M], SourceShape[T], FlowShape[Out, Out @ uncheckedVariance Pair T]] {
def apply(b: GraphDSL.Builder[M], s: SourceShape[T]): FlowShape[Out, Out @uncheckedVariance Pair T] = {
val zip: FanInShape2[Out, T, Out Pair T] = b.add(Zip.create[Out, T])
b.from(s).toInlet(zip.in1)
@ -1638,7 +1638,7 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends
* downstream.
*/
def watchTermination[M]()(matF: function.Function2[Mat, CompletionStage[Done], M]): javadsl.Flow[In, Out, M] =
new Flow(delegate.watchTermination()((left, right) => matF(left, right.toJava)))
new Flow(delegate.watchTermination()((left, right) matF(left, right.toJava)))
/**
* Delays the initial element by the specified duration.

View file

@ -838,7 +838,7 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap
* @see [[#mapAsyncUnordered]]
*/
def mapAsync[T](parallelism: Int, f: function.Function[Out, CompletionStage[T]]): javadsl.Source[T, Mat] =
new Source(delegate.mapAsync(parallelism)(x => f(x).toScala))
new Source(delegate.mapAsync(parallelism)(x f(x).toScala))
/**
* Transform this stream by applying the given function to each of the elements
@ -870,7 +870,7 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap
* @see [[#mapAsync]]
*/
def mapAsyncUnordered[T](parallelism: Int, f: function.Function[Out, CompletionStage[T]]): javadsl.Source[T, Mat] =
new Source(delegate.mapAsyncUnordered(parallelism)(x => f(x).toScala))
new Source(delegate.mapAsyncUnordered(parallelism)(x f(x).toScala))
/**
* Only pass on those elements that satisfy the given predicate.
@ -1777,7 +1777,7 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap
* downstream.
*/
def watchTermination[M]()(matF: function.Function2[Mat, CompletionStage[Done], M]): javadsl.Source[Out, M] =
new Source(delegate.watchTermination()((left, right) => matF(left, right.toJava)))
new Source(delegate.watchTermination()((left, right) matF(left, right.toJava)))
/**
* Delays the initial element by the specified duration.

View file

@ -201,7 +201,7 @@ class SubFlow[-In, +Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Flo
* @see [[#mapAsyncUnordered]]
*/
def mapAsync[T](parallelism: Int, f: function.Function[Out, CompletionStage[T]]): SubFlow[In, T, Mat] =
new SubFlow(delegate.mapAsync(parallelism)(x => f(x).toScala))
new SubFlow(delegate.mapAsync(parallelism)(x f(x).toScala))
/**
* Transform this stream by applying the given function to each of the elements
@ -233,7 +233,7 @@ class SubFlow[-In, +Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Flo
* @see [[#mapAsync]]
*/
def mapAsyncUnordered[T](parallelism: Int, f: function.Function[Out, CompletionStage[T]]): SubFlow[In, T, Mat] =
new SubFlow(delegate.mapAsyncUnordered(parallelism)(x => f(x).toScala))
new SubFlow(delegate.mapAsyncUnordered(parallelism)(x f(x).toScala))
/**
* Only pass on those elements that satisfy the given predicate.

View file

@ -199,7 +199,7 @@ class SubSource[+Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source
* @see [[#mapAsyncUnordered]]
*/
def mapAsync[T](parallelism: Int, f: function.Function[Out, CompletionStage[T]]): SubSource[T, Mat] =
new SubSource(delegate.mapAsync(parallelism)(x => f(x).toScala))
new SubSource(delegate.mapAsync(parallelism)(x f(x).toScala))
/**
* Transform this stream by applying the given function to each of the elements
@ -231,7 +231,7 @@ class SubSource[+Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source
* @see [[#mapAsync]]
*/
def mapAsyncUnordered[T](parallelism: Int, f: function.Function[Out, CompletionStage[T]]): SubSource[T, Mat] =
new SubSource(delegate.mapAsyncUnordered(parallelism)(x => f(x).toScala))
new SubSource(delegate.mapAsyncUnordered(parallelism)(x f(x).toScala))
/**
* Only pass on those elements that satisfy the given predicate.

View file

@ -1009,7 +1009,7 @@ object GraphDSL extends GraphApply {
}
private class PortOpsImpl[+Out](override val outlet: Outlet[Out @uncheckedVariance], b: Builder[_])
extends PortOps[Out] {
extends PortOps[Out] {
override def withAttributes(attr: Attributes): Repr[Out] = throw settingAttrNotSupported
override def addAttributes(attr: Attributes): Repr[Out] = throw settingAttrNotSupported