diff --git a/akka-actor/src/main/scala/akka/routing/RouterConfig.scala b/akka-actor/src/main/scala/akka/routing/RouterConfig.scala index a32142642c..75e1057ed5 100644 --- a/akka-actor/src/main/scala/akka/routing/RouterConfig.scala +++ b/akka-actor/src/main/scala/akka/routing/RouterConfig.scala @@ -42,7 +42,7 @@ trait RouterConfig extends Serializable { /** * Create the actual router, responsible for routing messages to routees. - * + * * @param system the ActorSystem this router belongs to */ def createRouter(system: ActorSystem): Router diff --git a/akka-docs/rst/java/code/docs/http/javadsl/server/HttpServerExampleDocTest.java b/akka-docs/rst/java/code/docs/http/javadsl/server/HttpServerExampleDocTest.java index f6431e2ab8..5efc1517f4 100644 --- a/akka-docs/rst/java/code/docs/http/javadsl/server/HttpServerExampleDocTest.java +++ b/akka-docs/rst/java/code/docs/http/javadsl/server/HttpServerExampleDocTest.java @@ -6,6 +6,8 @@ package docs.http.javadsl.server; import akka.NotUsed; import akka.actor.ActorSystem; +import akka.dispatch.OnFailure; +import akka.http.javadsl.ConnectHttp; import akka.http.javadsl.Http; import akka.http.javadsl.IncomingConnection; import akka.http.javadsl.ServerBinding; @@ -38,7 +40,7 @@ public class HttpServerExampleDocTest { Materializer materializer = ActorMaterializer.create(system); Source> serverSource = - Http.get(system).bind("localhost", 8080, materializer); + Http.get(system).bind(ConnectHttp.toHost("localhost", 8080), materializer); CompletionStage serverBindingFuture = serverSource.to(Sink.foreach(connection -> { @@ -56,7 +58,7 @@ public class HttpServerExampleDocTest { Materializer materializer = ActorMaterializer.create(system); Source> serverSource = - Http.get(system).bind("localhost", 80, materializer); + Http.get(system).bind(ConnectHttp.toHost("localhost", 80), materializer); CompletionStage serverBindingFuture = serverSource.to(Sink.foreach(connection -> { @@ -78,7 +80,7 @@ public class HttpServerExampleDocTest { Materializer materializer = ActorMaterializer.create(system); Source> serverSource = - Http.get(system).bind("localhost", 8080, materializer); + Http.get(system).bind(ConnectHttp.toHost("localhost", 8080), materializer); Flow failureDetection = Flow.of(IncomingConnection.class).watchTermination((notUsed, termination) -> { @@ -108,7 +110,7 @@ public class HttpServerExampleDocTest { Materializer materializer = ActorMaterializer.create(system); Source> serverSource = - Http.get(system).bind("localhost", 8080, materializer); + Http.get(system).bind(ConnectHttp.toHost("localhost", 8080), materializer); Flow failureDetection = Flow.of(HttpRequest.class) @@ -151,7 +153,7 @@ public class HttpServerExampleDocTest { final Materializer materializer = ActorMaterializer.create(system); Source> serverSource = - Http.get(system).bind("localhost", 8080, materializer); + Http.get(system).bind(ConnectHttp.toHost("localhost", 8080), materializer); //#request-handler final Function requestHandler = diff --git a/akka-docs/rst/java/code/docs/http/javadsl/server/WebSocketCoreExample.java b/akka-docs/rst/java/code/docs/http/javadsl/server/WebSocketCoreExample.java index e8232a2aa4..12f3b97698 100644 --- a/akka-docs/rst/java/code/docs/http/javadsl/server/WebSocketCoreExample.java +++ b/akka-docs/rst/java/code/docs/http/javadsl/server/WebSocketCoreExample.java @@ -11,6 +11,7 @@ import java.util.concurrent.CompletionStage; import java.util.concurrent.TimeUnit; import akka.NotUsed; +import akka.http.javadsl.ConnectHttp; import scala.concurrent.Await; import scala.concurrent.Future; import scala.concurrent.duration.FiniteDuration; @@ -57,7 +58,7 @@ public class WebSocketCoreExample { public HttpResponse apply(HttpRequest request) throws Exception { return handleRequest(request); } - }, "localhost", 8080, materializer); + }, ConnectHttp.toHost("localhost", 8080), materializer); // will throw if binding fails serverBindingFuture.toCompletableFuture().get(1, TimeUnit.SECONDS); diff --git a/akka-docs/rst/scala/stream/migration-guide-2.0-2.4-scala.rst b/akka-docs/rst/scala/stream/migration-guide-2.0-2.4-scala.rst index bc2def8486..996d179ea2 100644 --- a/akka-docs/rst/scala/stream/migration-guide-2.0-2.4-scala.rst +++ b/akka-docs/rst/scala/stream/migration-guide-2.0-2.4-scala.rst @@ -169,4 +169,23 @@ Previously we had a mix of methods and classes called ``websocket`` or ``Websock how the word is spelled in the spec and some other places of Akka HTTP. Methods and classes using the word WebSocket now consistently use it as ``WebSocket``, so updating is as simple as -find-and-replacing the lower-case ``s`` to an upper-case ``S`` wherever the word WebSocket appeared. \ No newline at end of file +find-and-replacing the lower-case ``s`` to an upper-case ``S`` wherever the word WebSocket appeared. + +Java DSL for Http binding and connections changed +------------------------------------------------- + +In order to minimise the number of needed overloads for each method defined on the ``Http`` extension +a new mini-DSL has been introduced for connecting to hosts given a hostname, port and optional ``ConnectionContext``. + +The availability of the connection context (if it's set to ``HttpsConnectionContext``) makes the server be bound +as an HTTPS server, and for outgoing connections those settings are used instead of the default ones if provided. + +Was:: + + http.cachedHostConnectionPool(toHost("akka.io"), materializer()); + http.cachedHostConnectionPool("akka.io", 80, httpsConnectionContext, materializer()); // does not work anymore + +Replace with:: + + http.cachedHostConnectionPool(toHostHttps("akka.io", 8081), materializer()); + http.cachedHostConnectionPool(toHostHttps("akka.io", 8081).withCustomHttpsContext(httpsContext), materializer()); diff --git a/akka-http-core/src/main/scala/akka/http/javadsl/ConnectHttp.scala b/akka-http-core/src/main/scala/akka/http/javadsl/ConnectHttp.scala index 97ddf6d6c2..08ea62479d 100644 --- a/akka-http-core/src/main/scala/akka/http/javadsl/ConnectHttp.scala +++ b/akka-http-core/src/main/scala/akka/http/javadsl/ConnectHttp.scala @@ -17,6 +17,12 @@ abstract class ConnectHttp { final def effectiveHttpsConnectionContext(fallbackContext: HttpsConnectionContext): HttpsConnectionContext = connectionContext.orElse(fallbackContext) + + final def effectiveConnectionContext(fallbackContext: ConnectionContext): ConnectionContext = + if (connectionContext.isPresent) connectionContext.get() + else fallbackContext + + override def toString = s"ConnectHttp($host,$port,$isHttps,$connectionContext)" } object ConnectHttp { @@ -35,7 +41,8 @@ object ConnectHttp { def toHost(host: String, port: Int): ConnectHttp = { require(port > 0, "port must be > 0") - toHost(Uri.create(host).port(port)) + val start = if (host.startsWith("http://") || host.startsWith("https://")) host else s"http://$host" + toHost(Uri.create(start).port(port)) } /** @@ -59,7 +66,8 @@ object ConnectHttp { @throws(classOf[IllegalArgumentException]) def toHostHttps(host: String, port: Int): ConnectWithHttps = { require(port > 0, "port must be > 0") - toHostHttps(Uri.create(host).port(port).host.address) + val start = if (host.startsWith("https://")) host else s"https://$host" + toHostHttps(Uri.create(s"$start").port(port)) } private def effectivePort(uri: Uri): Int = { diff --git a/akka-http-core/src/main/scala/akka/http/javadsl/Http.scala b/akka-http-core/src/main/scala/akka/http/javadsl/Http.scala index 0d65c379ce..3d0e2ee6ab 100644 --- a/akka-http-core/src/main/scala/akka/http/javadsl/Http.scala +++ b/akka-http-core/src/main/scala/akka/http/javadsl/Http.scala @@ -83,39 +83,25 @@ class Http(system: ExtendedActorSystem) extends akka.actor.Extension { /** * Creates a [[Source]] of [[IncomingConnection]] instances which represents a prospective HTTP server binding * on the given `endpoint`. + * * If the given port is 0 the resulting source can be materialized several times. Each materialization will * then be assigned a new local port by the operating system, which can then be retrieved by the materialized * [[ServerBinding]]. + * * If the given port is non-zero subsequent materialization attempts of the produced source will immediately * fail, unless the first materialization has already been unbound. Unbinding can be triggered via the materialized * [[ServerBinding]]. + * + * The server will be bound using HTTPS if the [[ConnectHttp]] object is configured with an [[HttpsConnectionContext]], + * or the [[defaultServerHttpContext]] has been configured to be an [[HttpsConnectionContext]]. */ def bind(connect: ConnectHttp, materializer: Materializer): Source[IncomingConnection, CompletionStage[ServerBinding]] = { - val connectionContext = - if (connect.connectionContext.isPresent) connect.connectionContext.get() - else defaultServerHttpContext - - new Source(delegate.bind(connect.host, connect.port, connectionContext.asScala)(materializer) + val connectionContext = connect.effectiveConnectionContext(defaultServerHttpContext).asScala + new Source(delegate.bind(connect.host, connect.port, connectionContext)(materializer) .map(new IncomingConnection(_)) .mapMaterializedValue(_.map(new ServerBinding(_))(ec).toJava)) } - - /** - * Creates a [[Source]] of [[IncomingConnection]] instances which represents a prospective HTTP server binding - * on the given `endpoint`. - * If the given port is 0 the resulting source can be materialized several times. Each materialization will - * then be assigned a new local port by the operating system, which can then be retrieved by the materialized - * [[ServerBinding]]. - * If the given port is non-zero subsequent materialization attempts of the produced source will immediately - * fail, unless the first materialization has already been unbound. Unbinding can be triggered via the materialized - * [[ServerBinding]]. - */ - def bind(interface: String, port: Int, materializer: Materializer): Source[IncomingConnection, CompletionStage[ServerBinding]] = - new Source(delegate.bind(interface, port)(materializer) - .map(new IncomingConnection(_)) - .mapMaterializedValue(_.map(new ServerBinding(_))(ec).toJava)) - /** * Creates a [[Source]] of [[IncomingConnection]] instances which represents a prospective HTTP server binding * on the given `endpoint`. @@ -127,14 +113,18 @@ class Http(system: ExtendedActorSystem) extends akka.actor.Extension { * If the given port is non-zero subsequent materialization attempts of the produced source will immediately * fail, unless the first materialization has already been unbound. Unbinding can be triggered via the materialized * [[ServerBinding]]. + * + * The server will be bound using HTTPS if the [[ConnectHttp]] object is configured with an [[HttpsConnectionContext]], + * or the [[defaultServerHttpContext]] has been configured to be an [[HttpsConnectionContext]]. */ - def bind(interface: String, port: Int, - connectionContext: ConnectionContext, + def bind(connect: ConnectHttp, settings: ServerSettings, - materializer: Materializer): Source[IncomingConnection, CompletionStage[ServerBinding]] = - new Source(delegate.bind(interface, port, settings = settings.asScala, connectionContext = ConnectionContext.noEncryption().asScala)(materializer) + materializer: Materializer): Source[IncomingConnection, CompletionStage[ServerBinding]] = { + val connectionContext = connect.effectiveConnectionContext(defaultServerHttpContext).asScala + new Source(delegate.bind(connect.host, connect.port, settings = settings.asScala, connectionContext = connectionContext)(materializer) .map(new IncomingConnection(_)) .mapMaterializedValue(_.map(new ServerBinding(_))(ec).toJava)) + } /** * Creates a [[Source]] of [[IncomingConnection]] instances which represents a prospective HTTP server binding @@ -147,34 +137,19 @@ class Http(system: ExtendedActorSystem) extends akka.actor.Extension { * If the given port is non-zero subsequent materialization attempts of the produced source will immediately * fail, unless the first materialization has already been unbound. Unbinding can be triggered via the materialized * [[ServerBinding]]. - */ - def bind(interface: String, port: Int, - connectionContext: ConnectionContext, - materializer: Materializer): Source[IncomingConnection, CompletionStage[ServerBinding]] = - new Source(delegate.bind(interface, port, connectionContext = connectionContext.asScala)(materializer) - .map(new IncomingConnection(_)) - .mapMaterializedValue(_.map(new ServerBinding(_))(ec).toJava)) - - /** - * Creates a [[Source]] of [[IncomingConnection]] instances which represents a prospective HTTP server binding - * on the given `endpoint`. * - * If the given port is 0 the resulting source can be materialized several times. Each materialization will - * then be assigned a new local port by the operating system, which can then be retrieved by the materialized - * [[ServerBinding]]. - * - * If the given port is non-zero subsequent materialization attempts of the produced source will immediately - * fail, unless the first materialization has already been unbound. Unbinding can be triggered via the materialized - * [[ServerBinding]]. + * The server will be bound using HTTPS if the [[ConnectHttp]] object is configured with an [[HttpsConnectionContext]], + * or the [[defaultServerHttpContext]] has been configured to be an [[HttpsConnectionContext]]. */ - def bind(interface: String, port: Int, - connectionContext: ConnectionContext, + def bind(connect: ConnectHttp, settings: ServerSettings, log: LoggingAdapter, - materializer: Materializer): Source[IncomingConnection, CompletionStage[ServerBinding]] = - new Source(delegate.bind(interface, port, ConnectionContext.noEncryption().asScala, settings.asScala, log)(materializer) + materializer: Materializer): Source[IncomingConnection, CompletionStage[ServerBinding]] = { + val connectionContext = connect.effectiveConnectionContext(defaultServerHttpContext).asScala + new Source(delegate.bind(connect.host, connect.port, connectionContext, settings.asScala, log)(materializer) .map(new IncomingConnection(_)) .mapMaterializedValue(_.map(new ServerBinding(_))(ec).toJava)) + } /** * Convenience method which starts a new HTTP server at the given endpoint and uses the given `handler` @@ -182,13 +157,18 @@ class Http(system: ExtendedActorSystem) extends akka.actor.Extension { * * The number of concurrently accepted connections can be configured by overriding * the `akka.http.server.max-connections` setting. + * + * The server will be bound using HTTPS if the [[ConnectHttp]] object is configured with an [[HttpsConnectionContext]], + * or the [[defaultServerHttpContext]] has been configured to be an [[HttpsConnectionContext]]. */ def bindAndHandle(handler: Flow[HttpRequest, HttpResponse, _], - interface: String, port: Int, - materializer: Materializer): CompletionStage[ServerBinding] = + connect: ConnectHttp, + materializer: Materializer): CompletionStage[ServerBinding] = { + val connectionContext = connect.effectiveConnectionContext(defaultServerHttpContext).asScala delegate.bindAndHandle(handler.asInstanceOf[Flow[sm.HttpRequest, sm.HttpResponse, _]].asScala, - interface, port)(materializer) + connect.host, connect.port, connectionContext)(materializer) .map(new ServerBinding(_))(ec).toJava + } /** * Convenience method which starts a new HTTP server at the given endpoint and uses the given `handler` @@ -196,31 +176,20 @@ class Http(system: ExtendedActorSystem) extends akka.actor.Extension { * * The number of concurrently accepted connections can be configured by overriding * the `akka.http.server.max-connections` setting. - */ - def bindAndHandle(handler: Flow[HttpRequest, HttpResponse, _], - interface: String, port: Int, - connectionContext: ConnectionContext, - materializer: Materializer): CompletionStage[ServerBinding] = - delegate.bindAndHandle(handler.asInstanceOf[Flow[sm.HttpRequest, sm.HttpResponse, _]].asScala, - interface, port, connectionContext.asScala)(materializer) - .map(new ServerBinding(_))(ec).toJava - - /** - * Convenience method which starts a new HTTP server at the given endpoint and uses the given `handler` - * [[Flow]] for processing all incoming connections. * - * The number of concurrently accepted connections can be configured by overriding - * the `akka.http.server.max-connections` setting. + * The server will be bound using HTTPS if the [[ConnectHttp]] object is configured with an [[HttpsConnectionContext]], + * or the [[defaultServerHttpContext]] has been configured to be an [[HttpsConnectionContext]]. */ def bindAndHandle(handler: Flow[HttpRequest, HttpResponse, _], - interface: String, port: Int, + connect: ConnectHttp, settings: ServerSettings, - connectionContext: ConnectionContext, log: LoggingAdapter, - materializer: Materializer): CompletionStage[ServerBinding] = + materializer: Materializer): CompletionStage[ServerBinding] = { + val connectionContext = connect.effectiveConnectionContext(defaultServerHttpContext).asScala delegate.bindAndHandle(handler.asInstanceOf[Flow[sm.HttpRequest, sm.HttpResponse, _]].asScala, - interface, port, connectionContext.asScala, settings.asScala, log)(materializer) + connect.host, connect.port, connectionContext, settings.asScala, log)(materializer) .map(new ServerBinding(_))(ec).toJava + } /** * Convenience method which starts a new HTTP server at the given endpoint and uses the given `handler` @@ -228,12 +197,17 @@ class Http(system: ExtendedActorSystem) extends akka.actor.Extension { * * The number of concurrently accepted connections can be configured by overriding * the `akka.http.server.max-connections` setting. + * + * The server will be bound using HTTPS if the [[ConnectHttp]] object is configured with an [[HttpsConnectionContext]], + * or the [[defaultServerHttpContext]] has been configured to be an [[HttpsConnectionContext]]. */ def bindAndHandleSync(handler: Function[HttpRequest, HttpResponse], - interface: String, port: Int, - materializer: Materializer): CompletionStage[ServerBinding] = - delegate.bindAndHandleSync(handler.apply(_).asScala, interface, port)(materializer) + connect: ConnectHttp, + materializer: Materializer): CompletionStage[ServerBinding] = { + val connectionContext = connect.effectiveConnectionContext(defaultServerHttpContext).asScala + delegate.bindAndHandleSync(handler.apply(_).asScala, connect.host, connect.port, connectionContext)(materializer) .map(new ServerBinding(_))(ec).toJava + } /** * Convenience method which starts a new HTTP server at the given endpoint and uses the given `handler` @@ -241,30 +215,20 @@ class Http(system: ExtendedActorSystem) extends akka.actor.Extension { * * The number of concurrently accepted connections can be configured by overriding * the `akka.http.server.max-connections` setting. - */ - def bindAndHandleSync(handler: Function[HttpRequest, HttpResponse], - interface: String, port: Int, - connectionContext: ConnectionContext, - materializer: Materializer): CompletionStage[ServerBinding] = - delegate.bindAndHandleSync(handler.apply(_).asScala, interface, port, connectionContext.asScala)(materializer) - .map(new ServerBinding(_))(ec).toJava - - /** - * Convenience method which starts a new HTTP server at the given endpoint and uses the given `handler` - * [[Flow]] for processing all incoming connections. * - * The number of concurrently accepted connections can be configured by overriding - * the `akka.http.server.max-connections` setting. + * The server will be bound using HTTPS if the [[ConnectHttp]] object is configured with an [[HttpsConnectionContext]], + * or the [[defaultServerHttpContext]] has been configured to be an [[HttpsConnectionContext]]. */ def bindAndHandleSync(handler: Function[HttpRequest, HttpResponse], - interface: String, port: Int, + connect: ConnectHttp, settings: ServerSettings, - connectionContext: ConnectionContext, log: LoggingAdapter, - materializer: Materializer): CompletionStage[ServerBinding] = + materializer: Materializer): CompletionStage[ServerBinding] = { + val connectionContext = connect.effectiveConnectionContext(defaultServerHttpContext).asScala delegate.bindAndHandleSync(handler.apply(_).asScala, - interface, port, connectionContext.asScala, settings.asScala, log)(materializer) + connect.host, connect.port, connectionContext, settings.asScala, log)(materializer) .map(new ServerBinding(_))(ec).toJava + } /** * Convenience method which starts a new HTTP server at the given endpoint and uses the given `handler` @@ -272,12 +236,17 @@ class Http(system: ExtendedActorSystem) extends akka.actor.Extension { * * The number of concurrently accepted connections can be configured by overriding * the `akka.http.server.max-connections` setting. + * + * The server will be bound using HTTPS if the [[ConnectHttp]] object is configured with an [[HttpsConnectionContext]], + * or the [[defaultServerHttpContext]] has been configured to be an [[HttpsConnectionContext]]. */ def bindAndHandleAsync(handler: Function[HttpRequest, CompletionStage[HttpResponse]], - interface: String, port: Int, - materializer: Materializer): CompletionStage[ServerBinding] = - delegate.bindAndHandleAsync(handler.apply(_).toScala, interface, port)(materializer) + connect: ConnectHttp, + materializer: Materializer): CompletionStage[ServerBinding] = { + val connectionContext = connect.effectiveConnectionContext(defaultServerHttpContext).asScala + delegate.bindAndHandleAsync(handler.apply(_).toScala, connect.host, connect.port, connectionContext)(materializer) .map(new ServerBinding(_))(ec).toJava + } /** * Convenience method which starts a new HTTP server at the given endpoint and uses the given `handler` @@ -285,29 +254,20 @@ class Http(system: ExtendedActorSystem) extends akka.actor.Extension { * * The number of concurrently accepted connections can be configured by overriding * the `akka.http.server.max-connections` setting. - */ - def bindAndHandleAsync(handler: Function[HttpRequest, CompletionStage[HttpResponse]], - interface: String, port: Int, - connectionContext: ConnectionContext, - materializer: Materializer): CompletionStage[ServerBinding] = - delegate.bindAndHandleAsync(handler.apply(_).toScala, interface, port, connectionContext.asScala)(materializer) - .map(new ServerBinding(_))(ec).toJava - - /** - * Convenience method which starts a new HTTP server at the given endpoint and uses the given `handler` - * [[Flow]] for processing all incoming connections. * - * The number of concurrently accepted connections can be configured by overriding - * the `akka.http.server.max-connections` setting. + * The server will be bound using HTTPS if the [[ConnectHttp]] object is configured with an [[HttpsConnectionContext]], + * or the [[defaultServerHttpContext]] has been configured to be an [[HttpsConnectionContext]]. */ def bindAndHandleAsync(handler: Function[HttpRequest, CompletionStage[HttpResponse]], - interface: String, port: Int, - settings: ServerSettings, connectionContext: ConnectionContext, + connect: ConnectHttp, + settings: ServerSettings, parallelism: Int, log: LoggingAdapter, - materializer: Materializer): CompletionStage[ServerBinding] = + materializer: Materializer): CompletionStage[ServerBinding] = { + val connectionContext = connect.effectiveConnectionContext(defaultServerHttpContext).asScala delegate.bindAndHandleAsync(handler.apply(_).toScala, - interface, port, connectionContext.asScala, settings.asScala, parallelism, log)(materializer) + connect.host, connect.port, connectionContext, settings.asScala, parallelism, log)(materializer) .map(new ServerBinding(_))(ec).toJava + } /** * Constructs a client layer stage using the configured default [[akka.http.javadsl.settings.ClientConnectionSettings]]. @@ -355,16 +315,15 @@ class Http(system: ExtendedActorSystem) extends akka.actor.Extension { * Creates a [[Flow]] representing a prospective HTTP client connection to the given endpoint. * Every materialization of the produced flow will attempt to establish a new outgoing connection. */ - def outgoingConnection(host: String, port: Int, - connectionContext: ConnectionContext, + def outgoingConnection(to: ConnectHttp, localAddress: Optional[InetSocketAddress], settings: ClientConnectionSettings, log: LoggingAdapter): Flow[HttpRequest, HttpResponse, CompletionStage[OutgoingConnection]] = adaptOutgoingFlow { - connectionContext match { - case https: HttpsConnectionContext ⇒ delegate.outgoingConnectionHttps(host, port, https.asScala, localAddress.asScala, settings.asScala, log) - case _ ⇒ delegate.outgoingConnection(host, port, localAddress.asScala, settings.asScala, log) - } + if (to.isHttps) + delegate.outgoingConnectionHttps(to.host, to.port, to.effectiveConnectionContext(defaultClientHttpsContext).asInstanceOf[HttpsConnectionContext].asScala, localAddress.asScala, settings.asScala, log) + else + delegate.outgoingConnection(to.host, to.port, localAddress.asScala, settings.asScala, log) } /** @@ -414,7 +373,7 @@ class Http(system: ExtendedActorSystem) extends akka.actor.Extension { case https: HttpsConnectionContext ⇒ delegate.newHostConnectionPoolHttps[T](to.host, to.port, https.asScala, settings.asScala, log)(materializer) .mapMaterializedValue(_.toJava) - case _ ⇒ + case _ ⇒ delegate.newHostConnectionPool[T](to.host, to.port, settings.asScala, log)(materializer) .mapMaterializedValue(_.toJava) } @@ -682,7 +641,7 @@ class Http(system: ExtendedActorSystem) extends akka.actor.Extension { /** * Gets the default - * + * * @return */ def defaultServerHttpContext: ConnectionContext = diff --git a/akka-http-core/src/main/scala/akka/http/scaladsl/model/headers/headers.scala b/akka-http-core/src/main/scala/akka/http/scaladsl/model/headers/headers.scala index e78293bfa1..9f2d14bd46 100644 --- a/akka-http-core/src/main/scala/akka/http/scaladsl/model/headers/headers.scala +++ b/akka-http-core/src/main/scala/akka/http/scaladsl/model/headers/headers.scala @@ -896,7 +896,7 @@ final case class `WWW-Authenticate`(challenges: immutable.Seq[HttpChallenge]) ex } // http://en.wikipedia.org/wiki/X-Forwarded-For -object `X-Forwarded-For` extends ModeledCompanion[`X-Forwarded-For`] { +object `X-Forwarded-For` extends ModeledCompanion[`X-Forwarded-For`] { def apply(first: RemoteAddress, more: RemoteAddress*): `X-Forwarded-For` = apply(immutable.Seq(first +: more: _*)) implicit val addressesRenderer = Renderer.defaultSeqRenderer[RemoteAddress] // cache } @@ -911,9 +911,9 @@ final case class `X-Forwarded-For`(addresses: immutable.Seq[RemoteAddress]) exte def getAddresses: Iterable[jm.RemoteAddress] = addresses.asJava } -object `X-Real-Ip` extends ModeledCompanion[`X-Real-Ip`] -final case class `X-Real-Ip`(address:RemoteAddress) extends jm.headers.XRealIp +object `X-Real-Ip` extends ModeledCompanion[`X-Real-Ip`] +final case class `X-Real-Ip`(address: RemoteAddress) extends jm.headers.XRealIp with RequestHeader { def renderValue[R <: Rendering](r: R): r.type = r ~~ address - protected def companion = `X-Real-Ip` + protected def companion = `X-Real-Ip` } diff --git a/akka-http-core/src/test/java/akka/http/javadsl/model/JavaTestServer.java b/akka-http-core/src/test/java/akka/http/javadsl/model/JavaTestServer.java index 7f3ce29d7c..f6c3f599a2 100644 --- a/akka-http-core/src/test/java/akka/http/javadsl/model/JavaTestServer.java +++ b/akka-http-core/src/test/java/akka/http/javadsl/model/JavaTestServer.java @@ -6,9 +6,9 @@ package akka.http.javadsl.model; import akka.NotUsed; import akka.actor.ActorSystem; +import akka.http.javadsl.ConnectHttp; import akka.http.javadsl.Http; import akka.http.javadsl.ServerBinding; -import akka.japi.Function; import akka.http.javadsl.model.ws.Message; import akka.http.javadsl.model.ws.TextMessage; import akka.http.javadsl.model.ws.WebSocket; @@ -17,9 +17,6 @@ import akka.stream.ActorMaterializer; import akka.stream.Materializer; import akka.stream.javadsl.Flow; import akka.stream.javadsl.Source; -import scala.concurrent.Await; -import scala.concurrent.Future; -import scala.concurrent.duration.FiniteDuration; import java.io.BufferedReader; import java.io.InputStreamReader; @@ -35,18 +32,16 @@ public class JavaTestServer { CompletionStage serverBindingFuture = Http.get(system).bindAndHandleSync( - new Function() { - public HttpResponse apply(HttpRequest request) throws Exception { - System.out.println("Handling request to " + request.getUri()); + request -> { + System.out.println("Handling request to " + request.getUri()); - if (request.getUri().path().equals("/")) - return WebSocket.handleWebSocketRequestWith(request, echoMessages()); - else if (request.getUri().path().equals("/greeter")) - return WebSocket.handleWebSocketRequestWith(request, greeter()); - else - return JavaApiTestCases.handleRequest(request); - } - }, "localhost", 8080, materializer); + if (request.getUri().path().equals("/")) + return WebSocket.handleWebSocketRequestWith(request, echoMessages()); + else if (request.getUri().path().equals("/greeter")) + return WebSocket.handleWebSocketRequestWith(request, greeter()); + else + return JavaApiTestCases.handleRequest(request); + }, ConnectHttp.toHost("localhost", 8080), materializer); serverBindingFuture.toCompletableFuture().get(1, TimeUnit.SECONDS); // will throw if binding fails System.out.println("Press ENTER to stop."); diff --git a/akka-http-core/src/test/scala/akka/http/javadsl/HttpExtensionApiSpec.scala b/akka-http-core/src/test/scala/akka/http/javadsl/HttpExtensionApiSpec.scala index dce742fae2..c34ebf7921 100644 --- a/akka-http-core/src/test/scala/akka/http/javadsl/HttpExtensionApiSpec.scala +++ b/akka-http-core/src/test/scala/akka/http/javadsl/HttpExtensionApiSpec.scala @@ -9,6 +9,7 @@ import java.util.concurrent.{ CompletionStage, TimeUnit, CompletableFuture } import akka.NotUsed import akka.http.impl.util.JavaMapping.HttpsConnectionContext +import akka.http.javadsl.ConnectHttp._ import akka.http.javadsl.model.ws._ import akka.http.javadsl.settings.{ ClientConnectionSettings, ConnectionPoolSettings, ServerSettings } import akka.japi.Pair @@ -72,7 +73,7 @@ class HttpExtensionApiSpec extends WordSpec with Matchers with BeforeAndAfterAll "properly bind a server (with three parameters)" in { val (_, host, port) = TestUtils.temporaryServerHostnameAndPort() val probe = TestSubscriber.manualProbe[IncomingConnection]() - val binding = http.bind(host, port, materializer) + val binding = http.bind(toHost(host, port), materializer) .toMat(Sink.fromSubscriber(probe), Keep.left) .run(materializer) val sub = probe.expectSubscription() @@ -83,7 +84,7 @@ class HttpExtensionApiSpec extends WordSpec with Matchers with BeforeAndAfterAll "properly bind a server (with four parameters)" in { val (_, host, port) = TestUtils.temporaryServerHostnameAndPort() val probe = TestSubscriber.manualProbe[IncomingConnection]() - val binding = http.bind(host, port, connectionContext, materializer) + val binding = http.bind(toHost(host, port), materializer) .toMat(Sink.fromSubscriber(probe), Keep.left) .run(materializer) val sub = probe.expectSubscription() @@ -94,7 +95,7 @@ class HttpExtensionApiSpec extends WordSpec with Matchers with BeforeAndAfterAll "properly bind a server (with five parameters)" in { val (_, host, port) = TestUtils.temporaryServerHostnameAndPort() val probe = TestSubscriber.manualProbe[IncomingConnection]() - val binding = http.bind(host, port, connectionContext, serverSettings, materializer) + val binding = http.bind(toHost(host, port), serverSettings, materializer) .toMat(Sink.fromSubscriber(probe), Keep.left) .run(materializer) val sub = probe.expectSubscription() @@ -105,7 +106,7 @@ class HttpExtensionApiSpec extends WordSpec with Matchers with BeforeAndAfterAll "properly bind a server (with six parameters)" in { val (_, host, port) = TestUtils.temporaryServerHostnameAndPort() val probe = TestSubscriber.manualProbe[IncomingConnection]() - val binding = http.bind(host, port, connectionContext, serverSettings, loggingAdapter, materializer) + val binding = http.bind(toHost(host, port), serverSettings, loggingAdapter, materializer) .toMat(Sink.fromSubscriber(probe), Keep.left) .run(materializer) val sub = probe.expectSubscription() @@ -119,9 +120,9 @@ class HttpExtensionApiSpec extends WordSpec with Matchers with BeforeAndAfterAll val flow: Flow[HttpRequest, HttpResponse, NotUsed] = akka.stream.scaladsl.Flow[HttpRequest] .map(req ⇒ HttpResponse.create()) .asJava - val binding = http.bindAndHandle(flow, host, port, materializer) + val binding = http.bindAndHandle(flow, toHost(host, port), materializer) - val (_, completion) = http.outgoingConnection(ConnectHttp.toHost(host, port)) + val (_, completion) = http.outgoingConnection(toHost(host, port)) .runWith(Source.single(HttpRequest.create("/abc")), Sink.head(), materializer).toScala waitFor(completion) @@ -133,9 +134,9 @@ class HttpExtensionApiSpec extends WordSpec with Matchers with BeforeAndAfterAll val flow: Flow[HttpRequest, HttpResponse, NotUsed] = akka.stream.scaladsl.Flow[HttpRequest] .map(req ⇒ HttpResponse.create()) .asJava - val binding = http.bindAndHandle(flow, host, port, connectionContext, materializer) + val binding = http.bindAndHandle(flow, toHost(host, port), materializer) - val (_, completion) = http.outgoingConnection(ConnectHttp.toHost(host, port)) + val (_, completion) = http.outgoingConnection(toHost(host, port)) .runWith(Source.single(HttpRequest.create("/abc")), Sink.head(), materializer).toScala waitFor(completion) @@ -147,9 +148,9 @@ class HttpExtensionApiSpec extends WordSpec with Matchers with BeforeAndAfterAll val flow: Flow[HttpRequest, HttpResponse, NotUsed] = akka.stream.scaladsl.Flow[HttpRequest] .map(req ⇒ HttpResponse.create()) .asJava - val binding = http.bindAndHandle(flow, host, port, serverSettings, connectionContext, loggingAdapter, materializer) + val binding = http.bindAndHandle(flow, toHost(host, port), serverSettings, loggingAdapter, materializer) - val (_, completion) = http.outgoingConnection(ConnectHttp.toHost(host, port)) + val (_, completion) = http.outgoingConnection(toHost(host, port)) .runWith(Source.single(HttpRequest.create("/abc")), Sink.head(), materializer).toScala waitFor(completion) @@ -158,7 +159,7 @@ class HttpExtensionApiSpec extends WordSpec with Matchers with BeforeAndAfterAll "properly bind and handle a server with a synchronous function (with four parameters)" in { val (_, host, port) = TestUtils.temporaryServerHostnameAndPort() - val binding = http.bindAndHandleSync(httpSuccessFunction, host, port, materializer) + val binding = http.bindAndHandleSync(httpSuccessFunction, toHost(host, port), materializer) val response = http.singleRequest(HttpRequest.create(s"http://$host:$port/").withMethod(HttpMethods.GET), materializer) @@ -168,7 +169,7 @@ class HttpExtensionApiSpec extends WordSpec with Matchers with BeforeAndAfterAll "properly bind and handle a server with a synchronous function (with five parameters)" in { val (_, host, port) = TestUtils.temporaryServerHostnameAndPort() - val binding = http.bindAndHandleSync(httpSuccessFunction, host, port, connectionContext, materializer) + val binding = http.bindAndHandleSync(httpSuccessFunction, toHost(host, port), materializer) val response = http.singleRequest(HttpRequest.create(s"http://$host:$port/").withMethod(HttpMethods.GET), materializer) @@ -178,7 +179,7 @@ class HttpExtensionApiSpec extends WordSpec with Matchers with BeforeAndAfterAll "properly bind and handle a server with a synchronous (with seven parameters)" in { val (_, host, port) = TestUtils.temporaryServerHostnameAndPort() - val binding = http.bindAndHandleSync(httpSuccessFunction, host, port, serverSettings, connectionContext, loggingAdapter, materializer) + val binding = http.bindAndHandleSync(httpSuccessFunction, toHost(host, port), serverSettings, loggingAdapter, materializer) val response = http.singleRequest(HttpRequest.create(s"http://$host:$port/").withMethod(HttpMethods.GET), materializer) @@ -188,7 +189,7 @@ class HttpExtensionApiSpec extends WordSpec with Matchers with BeforeAndAfterAll "properly bind and handle a server with an asynchronous function (with four parameters)" in { val (_, host, port) = TestUtils.temporaryServerHostnameAndPort() - val binding = http.bindAndHandleAsync(asyncHttpSuccessFunction, host, port, materializer) + val binding = http.bindAndHandleAsync(asyncHttpSuccessFunction, toHost(host, port), materializer) val response = http.singleRequest(HttpRequest.create(s"http://$host:$port/").withMethod(HttpMethods.GET), materializer) @@ -198,7 +199,7 @@ class HttpExtensionApiSpec extends WordSpec with Matchers with BeforeAndAfterAll "properly bind and handle a server with an asynchronous function (with five parameters)" in { val (_, host, port) = TestUtils.temporaryServerHostnameAndPort() - val binding = http.bindAndHandleAsync(asyncHttpSuccessFunction, host, port, connectionContext, materializer) + val binding = http.bindAndHandleAsync(asyncHttpSuccessFunction, toHost(host, port), materializer) val response = http.singleRequest(HttpRequest.create(s"http://$host:$port/").withMethod(HttpMethods.GET), materializer) @@ -208,7 +209,7 @@ class HttpExtensionApiSpec extends WordSpec with Matchers with BeforeAndAfterAll "properly bind and handle a server with an asynchronous function (with eight parameters)" in { val (_, host, port) = TestUtils.temporaryServerHostnameAndPort() - val binding = http.bindAndHandleAsync(asyncHttpSuccessFunction, host, port, serverSettings, connectionContext, 1, loggingAdapter, materializer) + val binding = http.bindAndHandleAsync(asyncHttpSuccessFunction, toHost(host, port), serverSettings, 1, loggingAdapter, materializer) val response = http.singleRequest(HttpRequest.create(s"http://$host:$port/").withMethod(HttpMethods.GET), materializer) @@ -236,7 +237,7 @@ class HttpExtensionApiSpec extends WordSpec with Matchers with BeforeAndAfterAll val (host, port, binding) = runServer() val poolFlow: Flow[Pair[HttpRequest, NotUsed], Pair[Try[HttpResponse], NotUsed], HostConnectionPool] = - http.cachedHostConnectionPool[NotUsed](ConnectHttp.toHost(host, port), materializer) + http.cachedHostConnectionPool[NotUsed](toHost(host, port), materializer) val pair: Pair[HostConnectionPool, CompletionStage[Pair[Try[HttpResponse], NotUsed]]] = Source.single(new Pair(HttpRequest.GET(s"http://$host:$port/"), NotUsed.getInstance())) @@ -256,7 +257,7 @@ class HttpExtensionApiSpec extends WordSpec with Matchers with BeforeAndAfterAll val (host, port, binding) = runServer() val poolFlow: Flow[Pair[HttpRequest, NotUsed], Pair[Try[HttpResponse], NotUsed], HostConnectionPool] = - http.cachedHostConnectionPool[NotUsed](ConnectHttp.toHost(host, port), poolSettings, loggingAdapter, materializer) + http.cachedHostConnectionPool[NotUsed](toHost(host, port), poolSettings, loggingAdapter, materializer) val pair: Pair[HostConnectionPool, CompletionStage[Pair[Try[HttpResponse], NotUsed]]] = Source.single(new Pair(HttpRequest.GET(s"http://$host:$port/"), NotUsed.getInstance())) @@ -291,7 +292,7 @@ class HttpExtensionApiSpec extends WordSpec with Matchers with BeforeAndAfterAll "create a host connection pool (with a ConnectHttp and a Materializer)" in { val (host, port, binding) = runServer() - val poolFlow = http.newHostConnectionPool[NotUsed](ConnectHttp.toHost(host, port), materializer) + val poolFlow = http.newHostConnectionPool[NotUsed](toHost(host, port), materializer) val pair: Pair[HostConnectionPool, CompletionStage[Pair[Try[HttpResponse], NotUsed]]] = Source.single(new Pair(get(host, port), NotUsed.getInstance())) @@ -311,7 +312,7 @@ class HttpExtensionApiSpec extends WordSpec with Matchers with BeforeAndAfterAll pending val (host, port, binding) = runServer() - val poolFlow = http.newHostConnectionPool[NotUsed](ConnectHttp.toHost(host, port), poolSettings, loggingAdapter, materializer) + val poolFlow = http.newHostConnectionPool[NotUsed](toHost(host, port), poolSettings, loggingAdapter, materializer) val pair: Pair[HostConnectionPool, CompletionStage[Pair[Try[HttpResponse], NotUsed]]] = Source.single(new Pair(get(host, port), NotUsed.getInstance())) @@ -369,7 +370,7 @@ class HttpExtensionApiSpec extends WordSpec with Matchers with BeforeAndAfterAll "create an outgoing connection (with a ConnectHttp)" in { val (host, port, binding) = runServer() - val flow = http.outgoingConnection(ConnectHttp.toHost(host, port)) + val flow = http.outgoingConnection(toHost(host, port)) val response = Source.single(get(host, port)) .via(flow) @@ -382,10 +383,9 @@ class HttpExtensionApiSpec extends WordSpec with Matchers with BeforeAndAfterAll "create an outgoing connection (with 6 parameters)" in { val (host, port, binding) = runServer() + println("host = " + host) val flow = http.outgoingConnection( - host, - port, - connectionContext, + toHost(host, port), Optional.empty(), ClientConnectionSettings.create(system), NoLogging) @@ -455,7 +455,7 @@ class HttpExtensionApiSpec extends WordSpec with Matchers with BeforeAndAfterAll def runServer(): (Host, Port, ServerBinding) = { val (_, host, port) = TestUtils.temporaryServerHostnameAndPort() - val server = http.bindAndHandleSync(httpSuccessFunction, host, port, materializer) + val server = http.bindAndHandleSync(httpSuccessFunction, toHost(host, port), materializer) (host, port, waitFor(server)) } @@ -467,7 +467,7 @@ class HttpExtensionApiSpec extends WordSpec with Matchers with BeforeAndAfterAll override def apply(request: HttpRequest): HttpResponse = { WebSocket.handleWebSocketRequestWith(request, Flow.create[Message]()) } - }, host, port, materializer) + }, toHost(host, port), materializer) (host, port, waitFor(server)) } diff --git a/akka-http-tests/src/test/java/akka/http/javadsl/client/HttpAPIsTest.java b/akka-http-tests/src/test/java/akka/http/javadsl/client/HttpAPIsTest.java index c5396fb212..f1448eaee9 100644 --- a/akka-http-tests/src/test/java/akka/http/javadsl/client/HttpAPIsTest.java +++ b/akka-http-tests/src/test/java/akka/http/javadsl/client/HttpAPIsTest.java @@ -36,22 +36,22 @@ public class HttpAPIsTest extends JUnitRouteTest { ConnectionPoolSettings conSettings = null; LoggingAdapter log = null; - http.bind("127.0.0.1", 8080, materializer()); - http.bind("127.0.0.1", 8080, connectionContext, materializer()); - http.bind("127.0.0.1", 8080, httpContext, materializer()); - http.bind("127.0.0.1", 8080, httpsContext, materializer()); + http.bind(toHost("127.0.0.1", 8080), materializer()); + http.bind(toHost("127.0.0.1", 8080), materializer()); + http.bind(toHostHttps("127.0.0.1", 8080), materializer()); final Flow handler = null; - http.bindAndHandle(handler, "127.0.0.1", 8080, materializer()); - http.bindAndHandle(handler, "127.0.0.1", 8080, httpsContext, materializer()); + http.bindAndHandle(handler, toHost("127.0.0.1", 8080), materializer()); + http.bindAndHandle(handler, toHost("127.0.0.1", 8080), materializer()); + http.bindAndHandle(handler, toHostHttps("127.0.0.1", 8080).withCustomHttpsContext(httpsContext), materializer()); final Function> handler1 = null; - http.bindAndHandleAsync(handler1, "127.0.0.1", 8080, materializer()); - http.bindAndHandleAsync(handler1, "127.0.0.1", 8080, httpsContext, materializer()); + http.bindAndHandleAsync(handler1, toHost("127.0.0.1", 8080), materializer()); + http.bindAndHandleAsync(handler1, toHostHttps("127.0.0.1", 8080), materializer()); final Function handler2 = null; - http.bindAndHandleSync(handler2, "127.0.0.1", 8080, materializer()); - http.bindAndHandleSync(handler2, "127.0.0.1", 8080, httpsContext, materializer()); + http.bindAndHandleSync(handler2, toHost("127.0.0.1", 8080), materializer()); + http.bindAndHandleSync(handler2, toHostHttps("127.0.0.1", 8080), materializer()); final HttpRequest handler3 = null; http.singleRequest(handler3, materializer()); diff --git a/akka-stream/src/main/scala/akka/stream/impl/StreamLayout.scala b/akka-stream/src/main/scala/akka/stream/impl/StreamLayout.scala index ce0d77aa50..e2f5f369b8 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/StreamLayout.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/StreamLayout.scala @@ -363,12 +363,12 @@ object StreamLayout { } final case class CompositeModule( - override val subModules: Set[Module], - override val shape: Shape, - override val downstreams: Map[OutPort, InPort], - override val upstreams: Map[InPort, OutPort], - override val materializedValueComputation: MaterializedValueNode, - override val attributes: Attributes) extends Module { + override val subModules: Set[Module], + override val shape: Shape, + override val downstreams: Map[OutPort, InPort], + override val upstreams: Map[InPort, OutPort], + override val materializedValueComputation: MaterializedValueNode, + override val attributes: Attributes) extends Module { override def replaceShape(s: Shape): Module = if (s != shape) { @@ -395,13 +395,13 @@ object StreamLayout { } final case class FusedModule( - override val subModules: Set[Module], - override val shape: Shape, - override val downstreams: Map[OutPort, InPort], - override val upstreams: Map[InPort, OutPort], - override val materializedValueComputation: MaterializedValueNode, - override val attributes: Attributes, - info: Fusing.StructuralInfo) extends Module { + override val subModules: Set[Module], + override val shape: Shape, + override val downstreams: Map[OutPort, InPort], + override val upstreams: Map[InPort, OutPort], + override val materializedValueComputation: MaterializedValueNode, + override val attributes: Attributes, + info: Fusing.StructuralInfo) extends Module { override def isFused: Boolean = true diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala index f3bbcee4ed..c2d684f505 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala @@ -347,7 +347,7 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends * @see [[#mapAsyncUnordered]] */ def mapAsync[T](parallelism: Int, f: function.Function[Out, CompletionStage[T]]): javadsl.Flow[In, T, Mat] = - new Flow(delegate.mapAsync(parallelism)(x => f(x).toScala)) + new Flow(delegate.mapAsync(parallelism)(x ⇒ f(x).toScala)) /** * Transform this stream by applying the given function to each of the elements @@ -379,7 +379,7 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends * @see [[#mapAsync]] */ def mapAsyncUnordered[T](parallelism: Int, f: function.Function[Out, CompletionStage[T]]): javadsl.Flow[In, T, Mat] = - new Flow(delegate.mapAsyncUnordered(parallelism)(x => f(x).toScala)) + new Flow(delegate.mapAsyncUnordered(parallelism)(x ⇒ f(x).toScala)) /** * Only pass on those elements that satisfy the given predicate. @@ -1461,7 +1461,7 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends def zipMat[T, M, M2](that: Graph[SourceShape[T], M], matF: function.Function2[Mat, M, M2]): javadsl.Flow[In, Out @uncheckedVariance Pair T, M2] = this.viaMat(Flow.fromGraph(GraphDSL.create(that, - new function.Function2[GraphDSL.Builder[M], SourceShape[T], FlowShape[Out, Out @uncheckedVariance Pair T]] { + new function.Function2[GraphDSL.Builder[M], SourceShape[T], FlowShape[Out, Out @ uncheckedVariance Pair T]] { def apply(b: GraphDSL.Builder[M], s: SourceShape[T]): FlowShape[Out, Out @uncheckedVariance Pair T] = { val zip: FanInShape2[Out, T, Out Pair T] = b.add(Zip.create[Out, T]) b.from(s).toInlet(zip.in1) @@ -1638,7 +1638,7 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends * downstream. */ def watchTermination[M]()(matF: function.Function2[Mat, CompletionStage[Done], M]): javadsl.Flow[In, Out, M] = - new Flow(delegate.watchTermination()((left, right) => matF(left, right.toJava))) + new Flow(delegate.watchTermination()((left, right) ⇒ matF(left, right.toJava))) /** * Delays the initial element by the specified duration. diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala index 178157febd..a4cfe336f7 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala @@ -838,7 +838,7 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap * @see [[#mapAsyncUnordered]] */ def mapAsync[T](parallelism: Int, f: function.Function[Out, CompletionStage[T]]): javadsl.Source[T, Mat] = - new Source(delegate.mapAsync(parallelism)(x => f(x).toScala)) + new Source(delegate.mapAsync(parallelism)(x ⇒ f(x).toScala)) /** * Transform this stream by applying the given function to each of the elements @@ -870,7 +870,7 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap * @see [[#mapAsync]] */ def mapAsyncUnordered[T](parallelism: Int, f: function.Function[Out, CompletionStage[T]]): javadsl.Source[T, Mat] = - new Source(delegate.mapAsyncUnordered(parallelism)(x => f(x).toScala)) + new Source(delegate.mapAsyncUnordered(parallelism)(x ⇒ f(x).toScala)) /** * Only pass on those elements that satisfy the given predicate. @@ -1777,7 +1777,7 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap * downstream. */ def watchTermination[M]()(matF: function.Function2[Mat, CompletionStage[Done], M]): javadsl.Source[Out, M] = - new Source(delegate.watchTermination()((left, right) => matF(left, right.toJava))) + new Source(delegate.watchTermination()((left, right) ⇒ matF(left, right.toJava))) /** * Delays the initial element by the specified duration. diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala index 26ea46a27a..cf2a7fbbcd 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala @@ -201,7 +201,7 @@ class SubFlow[-In, +Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Flo * @see [[#mapAsyncUnordered]] */ def mapAsync[T](parallelism: Int, f: function.Function[Out, CompletionStage[T]]): SubFlow[In, T, Mat] = - new SubFlow(delegate.mapAsync(parallelism)(x => f(x).toScala)) + new SubFlow(delegate.mapAsync(parallelism)(x ⇒ f(x).toScala)) /** * Transform this stream by applying the given function to each of the elements @@ -233,7 +233,7 @@ class SubFlow[-In, +Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Flo * @see [[#mapAsync]] */ def mapAsyncUnordered[T](parallelism: Int, f: function.Function[Out, CompletionStage[T]]): SubFlow[In, T, Mat] = - new SubFlow(delegate.mapAsyncUnordered(parallelism)(x => f(x).toScala)) + new SubFlow(delegate.mapAsyncUnordered(parallelism)(x ⇒ f(x).toScala)) /** * Only pass on those elements that satisfy the given predicate. diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala b/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala index 96b8dce5b2..fc88a40f7b 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala @@ -199,7 +199,7 @@ class SubSource[+Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source * @see [[#mapAsyncUnordered]] */ def mapAsync[T](parallelism: Int, f: function.Function[Out, CompletionStage[T]]): SubSource[T, Mat] = - new SubSource(delegate.mapAsync(parallelism)(x => f(x).toScala)) + new SubSource(delegate.mapAsync(parallelism)(x ⇒ f(x).toScala)) /** * Transform this stream by applying the given function to each of the elements @@ -231,7 +231,7 @@ class SubSource[+Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source * @see [[#mapAsync]] */ def mapAsyncUnordered[T](parallelism: Int, f: function.Function[Out, CompletionStage[T]]): SubSource[T, Mat] = - new SubSource(delegate.mapAsyncUnordered(parallelism)(x => f(x).toScala)) + new SubSource(delegate.mapAsyncUnordered(parallelism)(x ⇒ f(x).toScala)) /** * Only pass on those elements that satisfy the given predicate. diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala index b94777b096..e8086ac2c0 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala @@ -1009,7 +1009,7 @@ object GraphDSL extends GraphApply { } private class PortOpsImpl[+Out](override val outlet: Outlet[Out @uncheckedVariance], b: Builder[_]) - extends PortOps[Out] { + extends PortOps[Out] { override def withAttributes(attr: Attributes): Repr[Out] = throw settingAttrNotSupported override def addAttributes(attr: Attributes): Repr[Out] = throw settingAttrNotSupported