!htp #18017 don't use scala.Tuple2 in javadsl

This commit is contained in:
Johannes Rudolph 2015-07-23 17:21:30 +02:00
parent 3e7621c836
commit e14773c3d9
3 changed files with 56 additions and 37 deletions

View file

@ -6,12 +6,8 @@ package docs.http.javadsl;
import akka.actor.ActorSystem; import akka.actor.ActorSystem;
import akka.http.javadsl.HostConnectionPool; import akka.http.javadsl.HostConnectionPool;
import akka.japi.Option;
import akka.japi.Pair; import akka.japi.Pair;
import akka.util.ByteString;
import org.junit.Test;
import scala.Tuple2;
import scala.concurrent.Future; import scala.concurrent.Future;
import akka.stream.ActorMaterializer; import akka.stream.ActorMaterializer;
import akka.stream.javadsl.*; import akka.stream.javadsl.*;
@ -44,21 +40,20 @@ public class HttpClientExampleDocTest {
final ActorSystem system = ActorSystem.create(); final ActorSystem system = ActorSystem.create();
final ActorMaterializer materializer = ActorMaterializer.create(system); final ActorMaterializer materializer = ActorMaterializer.create(system);
// construct a pool client flow with context type `Int` // construct a pool client flow with context type `Integer`
// TODO these Tuple2 will be changed to akka.japi.Pair
final Flow< final Flow<
Tuple2<HttpRequest, Integer>, Pair<HttpRequest, Integer>,
Tuple2<Try<HttpResponse>, Integer>, Pair<Try<HttpResponse>, Integer>,
HostConnectionPool> poolClientFlow = HostConnectionPool> poolClientFlow =
Http.get(system).<Integer>cachedHostConnectionPool("akka.io", 80, materializer); Http.get(system).<Integer>cachedHostConnectionPool("akka.io", 80, materializer);
// construct a pool client flow with context type `Int` // construct a pool client flow with context type `Integer`
final Future<Tuple2<Try<HttpResponse>, Integer>> responseFuture = final Future<Pair<Try<HttpResponse>, Integer>> responseFuture =
Source Source
.single(Pair.create(HttpRequest.create("/"), 42).toScala()) .single(Pair.create(HttpRequest.create("/"), 42))
.via(poolClientFlow) .via(poolClientFlow)
.runWith(Sink.<Tuple2<Try<HttpResponse>, Integer>>head(), materializer); .runWith(Sink.<Pair<Try<HttpResponse>, Integer>>head(), materializer);
//#host-level-example //#host-level-example
} }

View file

@ -7,6 +7,7 @@ package akka.http.impl.util
import java.net.InetAddress import java.net.InetAddress
import java.{ util ju, lang jl } import java.{ util ju, lang jl }
import akka.http.scaladsl.model.ws.Message import akka.http.scaladsl.model.ws.Message
import akka.japi.Pair
import akka.stream.javadsl import akka.stream.javadsl
import akka.stream.scaladsl import akka.stream.scaladsl
@ -17,13 +18,16 @@ import akka.http.impl.model.JavaUri
import akka.http.javadsl.{ model jm } import akka.http.javadsl.{ model jm }
import akka.http.scaladsl.{ model sm } import akka.http.scaladsl.{ model sm }
import scala.util.Try
/** INTERNAL API */ /** INTERNAL API */
trait J2SMapping[J] { private[http] trait J2SMapping[J] {
type S type S
def toScala(javaObject: J): S def toScala(javaObject: J): S
} }
/** INTERNAL API */ /** INTERNAL API */
object J2SMapping { private[http] object J2SMapping {
implicit def fromJavaMapping[J](implicit mapping: JavaMapping[J, _]): J2SMapping[J] { type S = mapping.S } = mapping implicit def fromJavaMapping[J](implicit mapping: JavaMapping[J, _]): J2SMapping[J] { type S = mapping.S } = mapping
implicit def seqMapping[J](implicit mapping: J2SMapping[J]): J2SMapping[Seq[J]] { type S = immutable.Seq[mapping.S] } = implicit def seqMapping[J](implicit mapping: J2SMapping[J]): J2SMapping[Seq[J]] { type S = immutable.Seq[mapping.S] } =
@ -32,23 +36,26 @@ object J2SMapping {
def toScala(javaObject: Seq[J]): S = javaObject.map(mapping.toScala(_)).toList def toScala(javaObject: Seq[J]): S = javaObject.map(mapping.toScala(_)).toList
} }
} }
/** INTERNAL API */ /** INTERNAL API */
trait S2JMapping[S] { private[http] trait S2JMapping[S] {
type J type J
def toJava(scalaObject: S): J def toJava(scalaObject: S): J
} }
/** INTERNAL API */ /** INTERNAL API */
object S2JMapping { private[http] object S2JMapping {
implicit def fromJavaMapping[S](implicit mapping: JavaMapping[_, S]): S2JMapping[S] { type J = mapping.J } = mapping implicit def fromJavaMapping[S](implicit mapping: JavaMapping[_, S]): S2JMapping[S] { type J = mapping.J } = mapping
} }
/** INTERNAL API */ /** INTERNAL API */
trait JavaMapping[_J, _S] extends J2SMapping[_J] with S2JMapping[_S] { private[http] trait JavaMapping[_J, _S] extends J2SMapping[_J] with S2JMapping[_S] {
type J = _J type J = _J
type S = _S type S = _S
} }
/** INTERNAL API */ /** INTERNAL API */
object JavaMapping { private[http] object JavaMapping {
trait AsScala[S] { trait AsScala[S] {
def asScala: S def asScala: S
} }
@ -74,6 +81,13 @@ object JavaMapping {
} }
} }
/** This trivial mapping isn't enabled by default to prevent it from conflicting with the `Inherited ones `*/
def identity[T]: JavaMapping[T, T] =
new JavaMapping[T, T] {
def toJava(scalaObject: T): J = scalaObject
def toScala(javaObject: T): S = javaObject
}
implicit def iterableMapping[_J, _S](implicit mapping: JavaMapping[_J, _S]): JavaMapping[jl.Iterable[_J], immutable.Seq[_S]] = implicit def iterableMapping[_J, _S](implicit mapping: JavaMapping[_J, _S]): JavaMapping[jl.Iterable[_J], immutable.Seq[_S]] =
new JavaMapping[jl.Iterable[_J], immutable.Seq[_S]] { new JavaMapping[jl.Iterable[_J], immutable.Seq[_S]] {
import collection.JavaConverters._ import collection.JavaConverters._
@ -103,6 +117,16 @@ object JavaMapping {
scaladsl.Flow[JIn].map(inMapping.toScala(_)).viaMat(scalaObject)(scaladsl.Keep.right).map(outMapping.toJava(_)) scaladsl.Flow[JIn].map(inMapping.toScala(_)).viaMat(scalaObject)(scaladsl.Keep.right).map(outMapping.toJava(_))
} }
} }
implicit def pairMapping[J1, J2, S1, S2](implicit _1Mapping: JavaMapping[J1, S1], _2Mapping: JavaMapping[J2, S2]): JavaMapping[Pair[J1, J2], (S1, S2)] =
new JavaMapping[Pair[J1, J2], (S1, S2)] {
def toJava(scalaObject: (S1, S2)): J = Pair(_1Mapping.toJava(scalaObject._1), _2Mapping.toJava(scalaObject._2))
def toScala(javaObject: Pair[J1, J2]): (S1, S2) = (_1Mapping.toScala(javaObject.first), _2Mapping.toScala(javaObject.second))
}
implicit def tryMapping[_J, _S](implicit mapping: JavaMapping[_J, _S]): JavaMapping[Try[_J], Try[_S]] =
new JavaMapping[Try[_J], Try[_S]] {
def toScala(javaObject: Try[_J]): S = javaObject.map(mapping.toScala(_))
def toJava(scalaObject: Try[_S]): J = scalaObject.map(mapping.toJava(_))
}
implicit object StringIdentity extends Identity[String] implicit object StringIdentity extends Identity[String]

View file

@ -6,12 +6,13 @@ package akka.http.javadsl
import java.lang.{ Iterable JIterable } import java.lang.{ Iterable JIterable }
import java.net.InetSocketAddress import java.net.InetSocketAddress
import akka.http.impl.util.JavaMapping
import scala.language.implicitConversions import scala.language.implicitConversions
import scala.concurrent.Future import scala.concurrent.Future
import scala.util.Try import scala.util.Try
import akka.stream.scaladsl.Keep import akka.stream.scaladsl.Keep
import akka.japi.Util._ import akka.japi.{ Pair, Option, Function }
import akka.japi.{ Option, Function }
import akka.actor.{ ExtendedActorSystem, ActorSystem, ExtensionIdProvider, ExtensionId } import akka.actor.{ ExtendedActorSystem, ActorSystem, ExtensionIdProvider, ExtensionId }
import akka.event.LoggingAdapter import akka.event.LoggingAdapter
import akka.io.Inet import akka.io.Inet
@ -240,13 +241,13 @@ class Http(system: ExtendedActorSystem) extends akka.actor.Extension {
* In order to allow for easy response-to-request association the flow takes in a custom, opaque context * In order to allow for easy response-to-request association the flow takes in a custom, opaque context
* object of type ``T`` from the application which is emitted together with the corresponding response. * object of type ``T`` from the application which is emitted together with the corresponding response.
*/ */
def newHostConnectionPool[T](host: String, port: Int, materializer: Materializer): Flow[(HttpRequest, T), (Try[HttpResponse], T), HostConnectionPool] = def newHostConnectionPool[T](host: String, port: Int, materializer: Materializer): Flow[Pair[HttpRequest, T], Pair[Try[HttpResponse], T], HostConnectionPool] =
adaptTupleFlow(delegate.newHostConnectionPool[T](host, port)(materializer)) adaptTupleFlow(delegate.newHostConnectionPool[T](host, port)(materializer))
/** /**
* Same as [[newHostConnectionPool]] but with HTTPS encryption. * Same as [[newHostConnectionPool]] but with HTTPS encryption.
*/ */
def newHostConnectionPoolTls[T](host: String, port: Int, materializer: Materializer): Flow[(HttpRequest, T), (Try[HttpResponse], T), HostConnectionPool] = def newHostConnectionPoolTls[T](host: String, port: Int, materializer: Materializer): Flow[Pair[HttpRequest, T], Pair[Try[HttpResponse], T], HostConnectionPool] =
adaptTupleFlow(delegate.newHostConnectionPoolTls[T](host, port)(materializer)) adaptTupleFlow(delegate.newHostConnectionPoolTls[T](host, port)(materializer))
/** /**
@ -266,7 +267,7 @@ class Http(system: ExtendedActorSystem) extends akka.actor.Extension {
def newHostConnectionPool[T](host: String, port: Int, def newHostConnectionPool[T](host: String, port: Int,
options: JIterable[Inet.SocketOption], options: JIterable[Inet.SocketOption],
settings: ConnectionPoolSettings, settings: ConnectionPoolSettings,
log: LoggingAdapter, materializer: Materializer): Flow[(HttpRequest, T), (Try[HttpResponse], T), HostConnectionPool] = log: LoggingAdapter, materializer: Materializer): Flow[Pair[HttpRequest, T], Pair[Try[HttpResponse], T], HostConnectionPool] =
adaptTupleFlow(delegate.newHostConnectionPool[T](host, port, settings, log)(materializer)) adaptTupleFlow(delegate.newHostConnectionPool[T](host, port, settings, log)(materializer))
/** /**
@ -279,7 +280,7 @@ class Http(system: ExtendedActorSystem) extends akka.actor.Extension {
options: JIterable[Inet.SocketOption], options: JIterable[Inet.SocketOption],
settings: ConnectionPoolSettings, settings: ConnectionPoolSettings,
httpsContext: Option[HttpsContext], httpsContext: Option[HttpsContext],
log: LoggingAdapter, materializer: Materializer): Flow[(HttpRequest, T), (Try[HttpResponse], T), HostConnectionPool] = log: LoggingAdapter, materializer: Materializer): Flow[Pair[HttpRequest, T], Pair[Try[HttpResponse], T], HostConnectionPool] =
adaptTupleFlow(delegate.newHostConnectionPoolTls[T](host, port, settings, adaptTupleFlow(delegate.newHostConnectionPoolTls[T](host, port, settings,
httpsContext.map(_.asInstanceOf[akka.http.scaladsl.HttpsContext]), log)(materializer)) httpsContext.map(_.asInstanceOf[akka.http.scaladsl.HttpsContext]), log)(materializer))
@ -297,7 +298,7 @@ class Http(system: ExtendedActorSystem) extends akka.actor.Extension {
* In order to allow for easy response-to-request association the flow takes in a custom, opaque context * In order to allow for easy response-to-request association the flow takes in a custom, opaque context
* object of type ``T`` from the application which is emitted together with the corresponding response. * object of type ``T`` from the application which is emitted together with the corresponding response.
*/ */
def newHostConnectionPool[T](setup: HostConnectionPoolSetup, materializer: Materializer): Flow[(HttpRequest, T), (Try[HttpResponse], T), HostConnectionPool] = def newHostConnectionPool[T](setup: HostConnectionPoolSetup, materializer: Materializer): Flow[Pair[HttpRequest, T], Pair[Try[HttpResponse], T], HostConnectionPool] =
adaptTupleFlow(delegate.newHostConnectionPool[T](setup)(materializer)) adaptTupleFlow(delegate.newHostConnectionPool[T](setup)(materializer))
/** /**
@ -317,13 +318,13 @@ class Http(system: ExtendedActorSystem) extends akka.actor.Extension {
* In order to allow for easy response-to-request association the flow takes in a custom, opaque context * In order to allow for easy response-to-request association the flow takes in a custom, opaque context
* object of type ``T`` from the application which is emitted together with the corresponding response. * object of type ``T`` from the application which is emitted together with the corresponding response.
*/ */
def cachedHostConnectionPool[T](host: String, port: Int, materializer: Materializer): Flow[(HttpRequest, T), (Try[HttpResponse], T), HostConnectionPool] = def cachedHostConnectionPool[T](host: String, port: Int, materializer: Materializer): Flow[Pair[HttpRequest, T], Pair[Try[HttpResponse], T], HostConnectionPool] =
adaptTupleFlow(delegate.cachedHostConnectionPool[T](host, port)(materializer)) adaptTupleFlow(delegate.cachedHostConnectionPool[T](host, port)(materializer))
/** /**
* Same as [[cachedHostConnectionPool]] but with HTTPS encryption. * Same as [[cachedHostConnectionPool]] but with HTTPS encryption.
*/ */
def cachedHostConnectionPoolTls[T](host: String, port: Int, materializer: Materializer): Flow[(HttpRequest, T), (Try[HttpResponse], T), HostConnectionPool] = def cachedHostConnectionPoolTls[T](host: String, port: Int, materializer: Materializer): Flow[Pair[HttpRequest, T], Pair[Try[HttpResponse], T], HostConnectionPool] =
adaptTupleFlow(delegate.cachedHostConnectionPoolTls[T](host, port)(materializer)) adaptTupleFlow(delegate.cachedHostConnectionPoolTls[T](host, port)(materializer))
/** /**
@ -345,7 +346,7 @@ class Http(system: ExtendedActorSystem) extends akka.actor.Extension {
*/ */
def cachedHostConnectionPool[T](host: String, port: Int, def cachedHostConnectionPool[T](host: String, port: Int,
settings: ConnectionPoolSettings, settings: ConnectionPoolSettings,
log: LoggingAdapter, materializer: Materializer): Flow[(HttpRequest, T), (Try[HttpResponse], T), HostConnectionPool] = log: LoggingAdapter, materializer: Materializer): Flow[Pair[HttpRequest, T], Pair[Try[HttpResponse], T], HostConnectionPool] =
adaptTupleFlow(delegate.cachedHostConnectionPool[T](host, port, settings, log)(materializer)) adaptTupleFlow(delegate.cachedHostConnectionPool[T](host, port, settings, log)(materializer))
/** /**
@ -357,7 +358,7 @@ class Http(system: ExtendedActorSystem) extends akka.actor.Extension {
def cachedHostConnectionPoolTls[T](host: String, port: Int, def cachedHostConnectionPoolTls[T](host: String, port: Int,
settings: ConnectionPoolSettings, settings: ConnectionPoolSettings,
httpsContext: Option[HttpsContext], httpsContext: Option[HttpsContext],
log: LoggingAdapter, materializer: Materializer): Flow[(HttpRequest, T), (Try[HttpResponse], T), HostConnectionPool] = log: LoggingAdapter, materializer: Materializer): Flow[Pair[HttpRequest, T], Pair[Try[HttpResponse], T], HostConnectionPool] =
adaptTupleFlow(delegate.cachedHostConnectionPoolTls[T](host, port, settings, adaptTupleFlow(delegate.cachedHostConnectionPoolTls[T](host, port, settings,
httpsContext.map(_.asInstanceOf[akka.http.scaladsl.HttpsContext]), log)(materializer)) httpsContext.map(_.asInstanceOf[akka.http.scaladsl.HttpsContext]), log)(materializer))
@ -378,7 +379,7 @@ class Http(system: ExtendedActorSystem) extends akka.actor.Extension {
* In order to allow for easy response-to-request association the flow takes in a custom, opaque context * In order to allow for easy response-to-request association the flow takes in a custom, opaque context
* object of type ``T`` from the application which is emitted together with the corresponding response. * object of type ``T`` from the application which is emitted together with the corresponding response.
*/ */
def cachedHostConnectionPool[T](setup: HostConnectionPoolSetup, materializer: Materializer): Flow[(HttpRequest, T), (Try[HttpResponse], T), HostConnectionPool] = def cachedHostConnectionPool[T](setup: HostConnectionPoolSetup, materializer: Materializer): Flow[Pair[HttpRequest, T], Pair[Try[HttpResponse], T], HostConnectionPool] =
adaptTupleFlow(delegate.cachedHostConnectionPool[T](setup)(materializer)) adaptTupleFlow(delegate.cachedHostConnectionPool[T](setup)(materializer))
/** /**
@ -393,7 +394,7 @@ class Http(system: ExtendedActorSystem) extends akka.actor.Extension {
* In order to allow for easy response-to-request association the flow takes in a custom, opaque context * In order to allow for easy response-to-request association the flow takes in a custom, opaque context
* object of type ``T`` from the application which is emitted together with the corresponding response. * object of type ``T`` from the application which is emitted together with the corresponding response.
*/ */
def superPool[T](materializer: Materializer): Flow[(HttpRequest, T), (Try[HttpResponse], T), Unit] = def superPool[T](materializer: Materializer): Flow[Pair[HttpRequest, T], Pair[Try[HttpResponse], T], Unit] =
adaptTupleFlow(delegate.superPool[T]()(materializer)) adaptTupleFlow(delegate.superPool[T]()(materializer))
/** /**
@ -413,7 +414,7 @@ class Http(system: ExtendedActorSystem) extends akka.actor.Extension {
*/ */
def superPool[T](settings: ConnectionPoolSettings, def superPool[T](settings: ConnectionPoolSettings,
httpsContext: Option[HttpsContext], httpsContext: Option[HttpsContext],
log: LoggingAdapter, materializer: Materializer): Flow[(HttpRequest, T), (Try[HttpResponse], T), Unit] = log: LoggingAdapter, materializer: Materializer): Flow[Pair[HttpRequest, T], Pair[Try[HttpResponse], T], Unit] =
adaptTupleFlow(delegate.superPool[T](settings, httpsContext, log)(materializer)) adaptTupleFlow(delegate.superPool[T](settings, httpsContext, log)(materializer))
/** /**
@ -463,9 +464,8 @@ class Http(system: ExtendedActorSystem) extends akka.actor.Extension {
def setDefaultClientHttpsContext(context: HttpsContext): Unit = def setDefaultClientHttpsContext(context: HttpsContext): Unit =
delegate.setDefaultClientHttpsContext(context.asInstanceOf[akka.http.scaladsl.HttpsContext]) delegate.setDefaultClientHttpsContext(context.asInstanceOf[akka.http.scaladsl.HttpsContext])
private def adaptTupleFlow[T, Mat](scalaFlow: akka.stream.scaladsl.Flow[(scaladsl.model.HttpRequest, T), (Try[HttpResponse], T), Mat]): Flow[(HttpRequest, T), (Try[HttpResponse], T), Mat] = private def adaptTupleFlow[T, Mat](scalaFlow: akka.stream.scaladsl.Flow[(scaladsl.model.HttpRequest, T), (Try[scaladsl.model.HttpResponse], T), Mat]): Flow[Pair[HttpRequest, T], Pair[Try[HttpResponse], T], Mat] = {
Flow.wrap { implicit val _ = JavaMapping.identity[T]
// we know that downcasting javadsl.model.HttpRequest => scaladsl.model.HttpRequest will always work JavaMapping.toJava(scalaFlow)(JavaMapping.flowMapping[Pair[HttpRequest, T], (scaladsl.model.HttpRequest, T), Pair[Try[HttpResponse], T], (Try[scaladsl.model.HttpResponse], T), Mat])
scalaFlow.asInstanceOf[akka.stream.scaladsl.Flow[(HttpRequest, T), (Try[HttpResponse], T), Mat]]
} }
} }