diff --git a/akka-docs-dev/rst/java/code/docs/http/javadsl/HttpClientExampleDocTest.java b/akka-docs-dev/rst/java/code/docs/http/javadsl/HttpClientExampleDocTest.java index da3481f9f4..d654f8bd7f 100644 --- a/akka-docs-dev/rst/java/code/docs/http/javadsl/HttpClientExampleDocTest.java +++ b/akka-docs-dev/rst/java/code/docs/http/javadsl/HttpClientExampleDocTest.java @@ -6,12 +6,8 @@ package docs.http.javadsl; import akka.actor.ActorSystem; import akka.http.javadsl.HostConnectionPool; -import akka.japi.Option; import akka.japi.Pair; -import akka.util.ByteString; -import org.junit.Test; -import scala.Tuple2; import scala.concurrent.Future; import akka.stream.ActorMaterializer; import akka.stream.javadsl.*; @@ -44,21 +40,20 @@ public class HttpClientExampleDocTest { final ActorSystem system = ActorSystem.create(); final ActorMaterializer materializer = ActorMaterializer.create(system); - // construct a pool client flow with context type `Int` - // TODO these Tuple2 will be changed to akka.japi.Pair + // construct a pool client flow with context type `Integer` final Flow< - Tuple2, - Tuple2, Integer>, + Pair, + Pair, Integer>, HostConnectionPool> poolClientFlow = Http.get(system).cachedHostConnectionPool("akka.io", 80, materializer); - // construct a pool client flow with context type `Int` + // construct a pool client flow with context type `Integer` - final Future, Integer>> responseFuture = + final Future, Integer>> responseFuture = Source - .single(Pair.create(HttpRequest.create("/"), 42).toScala()) + .single(Pair.create(HttpRequest.create("/"), 42)) .via(poolClientFlow) - .runWith(Sink., Integer>>head(), materializer); + .runWith(Sink., Integer>>head(), materializer); //#host-level-example } diff --git a/akka-http-core/src/main/scala/akka/http/impl/util/JavaMapping.scala b/akka-http-core/src/main/scala/akka/http/impl/util/JavaMapping.scala index c2bdf4c2ed..ca4f6a5c74 100644 --- a/akka-http-core/src/main/scala/akka/http/impl/util/JavaMapping.scala +++ b/akka-http-core/src/main/scala/akka/http/impl/util/JavaMapping.scala @@ -7,6 +7,7 @@ package akka.http.impl.util import java.net.InetAddress import java.{ util ⇒ ju, lang ⇒ jl } import akka.http.scaladsl.model.ws.Message +import akka.japi.Pair import akka.stream.javadsl import akka.stream.scaladsl @@ -17,13 +18,16 @@ import akka.http.impl.model.JavaUri import akka.http.javadsl.{ model ⇒ jm } import akka.http.scaladsl.{ model ⇒ sm } +import scala.util.Try + /** INTERNAL API */ -trait J2SMapping[J] { +private[http] trait J2SMapping[J] { type S def toScala(javaObject: J): S } + /** INTERNAL API */ -object J2SMapping { +private[http] object J2SMapping { implicit def fromJavaMapping[J](implicit mapping: JavaMapping[J, _]): J2SMapping[J] { type S = mapping.S } = mapping implicit def seqMapping[J](implicit mapping: J2SMapping[J]): J2SMapping[Seq[J]] { type S = immutable.Seq[mapping.S] } = @@ -32,23 +36,26 @@ object J2SMapping { def toScala(javaObject: Seq[J]): S = javaObject.map(mapping.toScala(_)).toList } } + /** INTERNAL API */ -trait S2JMapping[S] { +private[http] trait S2JMapping[S] { type J def toJava(scalaObject: S): J } + /** INTERNAL API */ -object S2JMapping { +private[http] object S2JMapping { implicit def fromJavaMapping[S](implicit mapping: JavaMapping[_, S]): S2JMapping[S] { type J = mapping.J } = mapping } /** INTERNAL API */ -trait JavaMapping[_J, _S] extends J2SMapping[_J] with S2JMapping[_S] { +private[http] trait JavaMapping[_J, _S] extends J2SMapping[_J] with S2JMapping[_S] { type J = _J type S = _S } + /** INTERNAL API */ -object JavaMapping { +private[http] object JavaMapping { trait AsScala[S] { def asScala: S } @@ -74,6 +81,13 @@ object JavaMapping { } } + /** This trivial mapping isn't enabled by default to prevent it from conflicting with the `Inherited ones `*/ + def identity[T]: JavaMapping[T, T] = + new JavaMapping[T, T] { + def toJava(scalaObject: T): J = scalaObject + def toScala(javaObject: T): S = javaObject + } + implicit def iterableMapping[_J, _S](implicit mapping: JavaMapping[_J, _S]): JavaMapping[jl.Iterable[_J], immutable.Seq[_S]] = new JavaMapping[jl.Iterable[_J], immutable.Seq[_S]] { import collection.JavaConverters._ @@ -103,6 +117,16 @@ object JavaMapping { scaladsl.Flow[JIn].map(inMapping.toScala(_)).viaMat(scalaObject)(scaladsl.Keep.right).map(outMapping.toJava(_)) } } + implicit def pairMapping[J1, J2, S1, S2](implicit _1Mapping: JavaMapping[J1, S1], _2Mapping: JavaMapping[J2, S2]): JavaMapping[Pair[J1, J2], (S1, S2)] = + new JavaMapping[Pair[J1, J2], (S1, S2)] { + def toJava(scalaObject: (S1, S2)): J = Pair(_1Mapping.toJava(scalaObject._1), _2Mapping.toJava(scalaObject._2)) + def toScala(javaObject: Pair[J1, J2]): (S1, S2) = (_1Mapping.toScala(javaObject.first), _2Mapping.toScala(javaObject.second)) + } + implicit def tryMapping[_J, _S](implicit mapping: JavaMapping[_J, _S]): JavaMapping[Try[_J], Try[_S]] = + new JavaMapping[Try[_J], Try[_S]] { + def toScala(javaObject: Try[_J]): S = javaObject.map(mapping.toScala(_)) + def toJava(scalaObject: Try[_S]): J = scalaObject.map(mapping.toJava(_)) + } implicit object StringIdentity extends Identity[String] diff --git a/akka-http-core/src/main/scala/akka/http/javadsl/Http.scala b/akka-http-core/src/main/scala/akka/http/javadsl/Http.scala index 01c8a00f1d..af763fbb9f 100644 --- a/akka-http-core/src/main/scala/akka/http/javadsl/Http.scala +++ b/akka-http-core/src/main/scala/akka/http/javadsl/Http.scala @@ -6,12 +6,13 @@ package akka.http.javadsl import java.lang.{ Iterable ⇒ JIterable } import java.net.InetSocketAddress +import akka.http.impl.util.JavaMapping + import scala.language.implicitConversions import scala.concurrent.Future import scala.util.Try import akka.stream.scaladsl.Keep -import akka.japi.Util._ -import akka.japi.{ Option, Function } +import akka.japi.{ Pair, Option, Function } import akka.actor.{ ExtendedActorSystem, ActorSystem, ExtensionIdProvider, ExtensionId } import akka.event.LoggingAdapter import akka.io.Inet @@ -240,13 +241,13 @@ class Http(system: ExtendedActorSystem) extends akka.actor.Extension { * In order to allow for easy response-to-request association the flow takes in a custom, opaque context * object of type ``T`` from the application which is emitted together with the corresponding response. */ - def newHostConnectionPool[T](host: String, port: Int, materializer: Materializer): Flow[(HttpRequest, T), (Try[HttpResponse], T), HostConnectionPool] = + def newHostConnectionPool[T](host: String, port: Int, materializer: Materializer): Flow[Pair[HttpRequest, T], Pair[Try[HttpResponse], T], HostConnectionPool] = adaptTupleFlow(delegate.newHostConnectionPool[T](host, port)(materializer)) /** * Same as [[newHostConnectionPool]] but with HTTPS encryption. */ - def newHostConnectionPoolTls[T](host: String, port: Int, materializer: Materializer): Flow[(HttpRequest, T), (Try[HttpResponse], T), HostConnectionPool] = + def newHostConnectionPoolTls[T](host: String, port: Int, materializer: Materializer): Flow[Pair[HttpRequest, T], Pair[Try[HttpResponse], T], HostConnectionPool] = adaptTupleFlow(delegate.newHostConnectionPoolTls[T](host, port)(materializer)) /** @@ -266,7 +267,7 @@ class Http(system: ExtendedActorSystem) extends akka.actor.Extension { def newHostConnectionPool[T](host: String, port: Int, options: JIterable[Inet.SocketOption], settings: ConnectionPoolSettings, - log: LoggingAdapter, materializer: Materializer): Flow[(HttpRequest, T), (Try[HttpResponse], T), HostConnectionPool] = + log: LoggingAdapter, materializer: Materializer): Flow[Pair[HttpRequest, T], Pair[Try[HttpResponse], T], HostConnectionPool] = adaptTupleFlow(delegate.newHostConnectionPool[T](host, port, settings, log)(materializer)) /** @@ -279,7 +280,7 @@ class Http(system: ExtendedActorSystem) extends akka.actor.Extension { options: JIterable[Inet.SocketOption], settings: ConnectionPoolSettings, httpsContext: Option[HttpsContext], - log: LoggingAdapter, materializer: Materializer): Flow[(HttpRequest, T), (Try[HttpResponse], T), HostConnectionPool] = + log: LoggingAdapter, materializer: Materializer): Flow[Pair[HttpRequest, T], Pair[Try[HttpResponse], T], HostConnectionPool] = adaptTupleFlow(delegate.newHostConnectionPoolTls[T](host, port, settings, httpsContext.map(_.asInstanceOf[akka.http.scaladsl.HttpsContext]), log)(materializer)) @@ -297,7 +298,7 @@ class Http(system: ExtendedActorSystem) extends akka.actor.Extension { * In order to allow for easy response-to-request association the flow takes in a custom, opaque context * object of type ``T`` from the application which is emitted together with the corresponding response. */ - def newHostConnectionPool[T](setup: HostConnectionPoolSetup, materializer: Materializer): Flow[(HttpRequest, T), (Try[HttpResponse], T), HostConnectionPool] = + def newHostConnectionPool[T](setup: HostConnectionPoolSetup, materializer: Materializer): Flow[Pair[HttpRequest, T], Pair[Try[HttpResponse], T], HostConnectionPool] = adaptTupleFlow(delegate.newHostConnectionPool[T](setup)(materializer)) /** @@ -317,13 +318,13 @@ class Http(system: ExtendedActorSystem) extends akka.actor.Extension { * In order to allow for easy response-to-request association the flow takes in a custom, opaque context * object of type ``T`` from the application which is emitted together with the corresponding response. */ - def cachedHostConnectionPool[T](host: String, port: Int, materializer: Materializer): Flow[(HttpRequest, T), (Try[HttpResponse], T), HostConnectionPool] = + def cachedHostConnectionPool[T](host: String, port: Int, materializer: Materializer): Flow[Pair[HttpRequest, T], Pair[Try[HttpResponse], T], HostConnectionPool] = adaptTupleFlow(delegate.cachedHostConnectionPool[T](host, port)(materializer)) /** * Same as [[cachedHostConnectionPool]] but with HTTPS encryption. */ - def cachedHostConnectionPoolTls[T](host: String, port: Int, materializer: Materializer): Flow[(HttpRequest, T), (Try[HttpResponse], T), HostConnectionPool] = + def cachedHostConnectionPoolTls[T](host: String, port: Int, materializer: Materializer): Flow[Pair[HttpRequest, T], Pair[Try[HttpResponse], T], HostConnectionPool] = adaptTupleFlow(delegate.cachedHostConnectionPoolTls[T](host, port)(materializer)) /** @@ -345,7 +346,7 @@ class Http(system: ExtendedActorSystem) extends akka.actor.Extension { */ def cachedHostConnectionPool[T](host: String, port: Int, settings: ConnectionPoolSettings, - log: LoggingAdapter, materializer: Materializer): Flow[(HttpRequest, T), (Try[HttpResponse], T), HostConnectionPool] = + log: LoggingAdapter, materializer: Materializer): Flow[Pair[HttpRequest, T], Pair[Try[HttpResponse], T], HostConnectionPool] = adaptTupleFlow(delegate.cachedHostConnectionPool[T](host, port, settings, log)(materializer)) /** @@ -357,7 +358,7 @@ class Http(system: ExtendedActorSystem) extends akka.actor.Extension { def cachedHostConnectionPoolTls[T](host: String, port: Int, settings: ConnectionPoolSettings, httpsContext: Option[HttpsContext], - log: LoggingAdapter, materializer: Materializer): Flow[(HttpRequest, T), (Try[HttpResponse], T), HostConnectionPool] = + log: LoggingAdapter, materializer: Materializer): Flow[Pair[HttpRequest, T], Pair[Try[HttpResponse], T], HostConnectionPool] = adaptTupleFlow(delegate.cachedHostConnectionPoolTls[T](host, port, settings, httpsContext.map(_.asInstanceOf[akka.http.scaladsl.HttpsContext]), log)(materializer)) @@ -378,7 +379,7 @@ class Http(system: ExtendedActorSystem) extends akka.actor.Extension { * In order to allow for easy response-to-request association the flow takes in a custom, opaque context * object of type ``T`` from the application which is emitted together with the corresponding response. */ - def cachedHostConnectionPool[T](setup: HostConnectionPoolSetup, materializer: Materializer): Flow[(HttpRequest, T), (Try[HttpResponse], T), HostConnectionPool] = + def cachedHostConnectionPool[T](setup: HostConnectionPoolSetup, materializer: Materializer): Flow[Pair[HttpRequest, T], Pair[Try[HttpResponse], T], HostConnectionPool] = adaptTupleFlow(delegate.cachedHostConnectionPool[T](setup)(materializer)) /** @@ -393,7 +394,7 @@ class Http(system: ExtendedActorSystem) extends akka.actor.Extension { * In order to allow for easy response-to-request association the flow takes in a custom, opaque context * object of type ``T`` from the application which is emitted together with the corresponding response. */ - def superPool[T](materializer: Materializer): Flow[(HttpRequest, T), (Try[HttpResponse], T), Unit] = + def superPool[T](materializer: Materializer): Flow[Pair[HttpRequest, T], Pair[Try[HttpResponse], T], Unit] = adaptTupleFlow(delegate.superPool[T]()(materializer)) /** @@ -413,7 +414,7 @@ class Http(system: ExtendedActorSystem) extends akka.actor.Extension { */ def superPool[T](settings: ConnectionPoolSettings, httpsContext: Option[HttpsContext], - log: LoggingAdapter, materializer: Materializer): Flow[(HttpRequest, T), (Try[HttpResponse], T), Unit] = + log: LoggingAdapter, materializer: Materializer): Flow[Pair[HttpRequest, T], Pair[Try[HttpResponse], T], Unit] = adaptTupleFlow(delegate.superPool[T](settings, httpsContext, log)(materializer)) /** @@ -463,9 +464,8 @@ class Http(system: ExtendedActorSystem) extends akka.actor.Extension { def setDefaultClientHttpsContext(context: HttpsContext): Unit = delegate.setDefaultClientHttpsContext(context.asInstanceOf[akka.http.scaladsl.HttpsContext]) - private def adaptTupleFlow[T, Mat](scalaFlow: akka.stream.scaladsl.Flow[(scaladsl.model.HttpRequest, T), (Try[HttpResponse], T), Mat]): Flow[(HttpRequest, T), (Try[HttpResponse], T), Mat] = - Flow.wrap { - // we know that downcasting javadsl.model.HttpRequest => scaladsl.model.HttpRequest will always work - scalaFlow.asInstanceOf[akka.stream.scaladsl.Flow[(HttpRequest, T), (Try[HttpResponse], T), Mat]] - } + private def adaptTupleFlow[T, Mat](scalaFlow: akka.stream.scaladsl.Flow[(scaladsl.model.HttpRequest, T), (Try[scaladsl.model.HttpResponse], T), Mat]): Flow[Pair[HttpRequest, T], Pair[Try[HttpResponse], T], Mat] = { + implicit val _ = JavaMapping.identity[T] + JavaMapping.toJava(scalaFlow)(JavaMapping.flowMapping[Pair[HttpRequest, T], (scaladsl.model.HttpRequest, T), Pair[Try[HttpResponse], T], (Try[scaladsl.model.HttpResponse], T), Mat]) + } }