+htc #17275 Java-side of Websocket client API

This commit is contained in:
Johannes Rudolph 2015-10-12 14:21:07 +02:00
parent 00b4eefab5
commit d4b5f29c57
4 changed files with 313 additions and 1 deletions

View file

@ -7,6 +7,8 @@ package akka.http.javadsl
import java.lang.{ Iterable JIterable }
import java.net.InetSocketAddress
import akka.http.impl.util.JavaMapping
import akka.http.javadsl.model.ws._
import akka.stream
import akka.stream.io.{ SslTlsInbound, SslTlsOutbound }
import scala.language.implicitConversions
@ -496,6 +498,93 @@ class Http(system: ExtendedActorSystem) extends akka.actor.Extension {
log: LoggingAdapter, materializer: Materializer): Future[HttpResponse] =
delegate.singleRequest(request.asScala, settings, httpsContext, log)(materializer)
/**
* Constructs a Websocket [[BidiFlow]].
*
* The layer is not reusable and must only be materialized once.
*/
def websocketClientLayer(request: WebsocketRequest): BidiFlow[Message, SslTlsOutbound, SslTlsInbound, Message, Future[WebsocketUpgradeResponse]] =
adaptWsBidiFlow(delegate.websocketClientLayer(request.asScala))
/**
* Constructs a Websocket [[BidiFlow]] using the configured default [[ClientConnectionSettings]],
* configured using the `akka.http.client` config section.
*
* The layer is not reusable and must only be materialized once.
*/
def websocketClientLayer(request: WebsocketRequest,
settings: ClientConnectionSettings): BidiFlow[Message, SslTlsOutbound, SslTlsInbound, Message, Future[WebsocketUpgradeResponse]] =
adaptWsBidiFlow(delegate.websocketClientLayer(request.asScala, settings))
/**
* Constructs a Websocket [[BidiFlow]] using the configured default [[ClientConnectionSettings]],
* configured using the `akka.http.client` config section.
*
* The layer is not reusable and must only be materialized once.
*/
def websocketClientLayer(request: WebsocketRequest,
settings: ClientConnectionSettings,
log: LoggingAdapter): BidiFlow[Message, SslTlsOutbound, SslTlsInbound, Message, Future[WebsocketUpgradeResponse]] =
adaptWsBidiFlow(delegate.websocketClientLayer(request.asScala, settings, log))
/**
* Constructs a flow that once materialized establishes a Websocket connection to the given Uri.
*
* The layer is not reusable and must only be materialized once.
*/
def websocketClientFlow(request: WebsocketRequest): Flow[Message, Message, Future[WebsocketUpgradeResponse]] =
adaptWsFlow {
delegate.websocketClientFlow(request.asScala)
}
/**
* Constructs a flow that once materialized establishes a Websocket connection to the given Uri.
*
* The layer is not reusable and must only be materialized once.
*/
def websocketClientFlow(request: WebsocketRequest,
localAddress: Option[InetSocketAddress],
settings: ClientConnectionSettings,
httpsContext: Option[HttpsContext],
log: LoggingAdapter): Flow[Message, Message, Future[WebsocketUpgradeResponse]] =
adaptWsFlow {
delegate.websocketClientFlow(request.asScala, localAddress, settings, httpsContext, log)
}
/**
* Runs a single Websocket conversation given a Uri and a flow that represents the client side of the
* Websocket conversation.
*/
def singleWebsocketRequest[T](request: WebsocketRequest,
clientFlow: Flow[Message, Message, T],
materializer: Materializer): Pair[Future[WebsocketUpgradeResponse], T] =
adaptWsResultTuple {
delegate.singleWebsocketRequest(
request.asScala,
adaptWsFlow[T](clientFlow))(materializer)
}
/**
* Runs a single Websocket conversation given a Uri and a flow that represents the client side of the
* Websocket conversation.
*/
def singleWebsocketRequest[T](request: WebsocketRequest,
clientFlow: Flow[Message, Message, T],
localAddress: Option[InetSocketAddress],
settings: ClientConnectionSettings,
httpsContext: Option[HttpsContext],
log: LoggingAdapter,
materializer: Materializer): Pair[Future[WebsocketUpgradeResponse], T] =
adaptWsResultTuple {
delegate.singleWebsocketRequest(
request.asScala,
adaptWsFlow[T](clientFlow),
localAddress,
settings,
httpsContext,
log)(materializer)
}
/**
* Triggers an orderly shutdown of all host connections pools currently maintained by the [[ActorSystem]].
* The returned future is completed when all pools that were live at the time of this method call
@ -517,7 +606,7 @@ class Http(system: ExtendedActorSystem) extends akka.actor.Extension {
def setDefaultClientHttpsContext(context: HttpsContext): Unit =
delegate.setDefaultClientHttpsContext(context.asInstanceOf[akka.http.scaladsl.HttpsContext])
private def adaptTupleFlow[T, Mat](scalaFlow: akka.stream.scaladsl.Flow[(scaladsl.model.HttpRequest, T), (Try[scaladsl.model.HttpResponse], T), Mat]): Flow[Pair[HttpRequest, T], Pair[Try[HttpResponse], T], Mat] = {
private def adaptTupleFlow[T, Mat](scalaFlow: stream.scaladsl.Flow[(scaladsl.model.HttpRequest, T), (Try[scaladsl.model.HttpResponse], T), Mat]): Flow[Pair[HttpRequest, T], Pair[Try[HttpResponse], T], Mat] = {
implicit val _ = JavaMapping.identity[T]
JavaMapping.toJava(scalaFlow)(JavaMapping.flowMapping[Pair[HttpRequest, T], (scaladsl.model.HttpRequest, T), Pair[Try[HttpResponse], T], (Try[scaladsl.model.HttpResponse], T), Mat])
}
@ -531,4 +620,25 @@ class Http(system: ExtendedActorSystem) extends akka.actor.Extension {
new BidiFlow(
JavaMapping.adapterBidiFlow[HttpRequest, sm.HttpRequest, sm.HttpResponse, HttpResponse]
.atop(clientLayer))
private def adaptWsBidiFlow(wsLayer: scaladsl.Http.WebsocketClientLayer): BidiFlow[Message, SslTlsOutbound, SslTlsInbound, Message, Future[WebsocketUpgradeResponse]] =
new BidiFlow(
JavaMapping.adapterBidiFlow[Message, sm.ws.Message, sm.ws.Message, Message]
.atopMat(wsLayer)((_, s) adaptWsUpgradeResponse(s)))
private def adaptWsFlow(wsLayer: stream.scaladsl.Flow[sm.ws.Message, sm.ws.Message, Future[scaladsl.Http.WebsocketUpgradeResponse]]): Flow[Message, Message, Future[WebsocketUpgradeResponse]] =
Flow.adapt(JavaMapping.adapterBidiFlow[Message, sm.ws.Message, sm.ws.Message, Message].joinMat(wsLayer)(Keep.right).mapMaterializedValue(adaptWsUpgradeResponse _))
private def adaptWsFlow[Mat](javaFlow: Flow[Message, Message, Mat]): stream.scaladsl.Flow[scaladsl.model.ws.Message, scaladsl.model.ws.Message, Mat] =
stream.scaladsl.Flow[scaladsl.model.ws.Message]
.map(Message.adapt)
.viaMat(javaFlow.asScala)(Keep.right)
.map(_.asScala)
private def adaptWsResultTuple[T](result: (Future[scaladsl.Http.WebsocketUpgradeResponse], T)): Pair[Future[WebsocketUpgradeResponse], T] =
result match {
case (fut, tMat) Pair(adaptWsUpgradeResponse(fut), tMat)
}
private def adaptWsUpgradeResponse(responseFuture: Future[scaladsl.Http.WebsocketUpgradeResponse]): Future[WebsocketUpgradeResponse] =
responseFuture.map(WebsocketUpgradeResponse.adapt)(system.dispatcher)
}

View file

@ -0,0 +1,61 @@
/*
* Copyright (C) 2009-2015 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.http.javadsl.model.ws
import akka.http.javadsl.model.{ Uri, HttpHeader }
import akka.http.scaladsl.model.ws.{ WebsocketRequest ScalaWebsocketRequest }
/**
* Represents a Websocket request. Use `WebsocketRequest.create` to create a request
* for a target URI and then use `addHeader` or `requestSubprotocol` to set optional
* details.
*/
abstract class WebsocketRequest {
/**
* Return a copy of this request that contains the given additional header.
*/
def addHeader(header: HttpHeader): WebsocketRequest
/**
* Return a copy of this request that will require that the server uses the
* given Websocket subprotocol.
*/
def requestSubprotocol(subprotocol: String): WebsocketRequest
def asScala: ScalaWebsocketRequest
}
object WebsocketRequest {
import akka.http.impl.util.JavaMapping.Implicits._
/**
* Creates a WebsocketRequest to a target URI. Use the methods on `WebsocketRequest`
* to specify further details.
*/
def create(uri: Uri): WebsocketRequest =
wrap(ScalaWebsocketRequest(uri.asScala))
/**
* Creates a WebsocketRequest to a target URI. Use the methods on `WebsocketRequest`
* to specify further details.
*/
def create(uriString: String): WebsocketRequest =
create(Uri.create(uriString))
/**
* Wraps a Scala version of WebsocketRequest.
*/
def wrap(scalaRequest: ScalaWebsocketRequest): WebsocketRequest =
new WebsocketRequest {
def addHeader(header: HttpHeader): WebsocketRequest =
transform(s s.copy(extraHeaders = s.extraHeaders :+ header.asScala))
def requestSubprotocol(subprotocol: String): WebsocketRequest =
transform(_.copy(subprotocol = Some(subprotocol)))
def asScala: ScalaWebsocketRequest = scalaRequest
def transform(f: ScalaWebsocketRequest ScalaWebsocketRequest): WebsocketRequest =
wrap(f(asScala))
}
}

View file

@ -0,0 +1,58 @@
/*
* Copyright (C) 2009-2015 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.http.javadsl.model.ws
import akka.http.javadsl.model.HttpResponse
import akka.http.scaladsl
import akka.http.scaladsl.Http.{ InvalidUpgradeResponse, ValidUpgrade }
import akka.japi.Option
/**
* Represents an upgrade response for a Websocket upgrade request. Can either be valid, in which
* case the `chosenSubprotocol` method is valid, or if invalid, the `invalidationReason` method
* can be used to find out why the upgrade failed.
*/
trait WebsocketUpgradeResponse {
def isValid: Boolean
/**
* Returns the response object as received from the server for further inspection.
*/
def response: HttpResponse
/**
* If valid, returns `Some(subprotocol)` (if any was requested), or `None` if none was
* chosen or offered.
*/
def chosenSubprotocol: Option[String]
/**
* If invalid, the reason why the server's upgrade response could not be accepted.
*/
def invalidationReason: String
}
object WebsocketUpgradeResponse {
import akka.http.impl.util.JavaMapping.Implicits._
def adapt(scalaResponse: scaladsl.Http.WebsocketUpgradeResponse): WebsocketUpgradeResponse =
scalaResponse match {
case ValidUpgrade(response, chosen)
new WebsocketUpgradeResponse {
def isValid: Boolean = true
def response: HttpResponse = response
def chosenSubprotocol: Option[String] = chosen.asJava
def invalidationReason: String =
throw new UnsupportedOperationException("invalidationReason must not be called for valid response")
}
case InvalidUpgradeResponse(response, cause)
new WebsocketUpgradeResponse {
def isValid: Boolean = false
def response: HttpResponse = response
def chosenSubprotocol: Option[String] = throw new UnsupportedOperationException("chosenSubprotocol must not be called for valid response")
def invalidationReason: String = cause
}
}
}

View file

@ -0,0 +1,83 @@
/*
* Copyright (C) 2009-2015 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.http.javadsl;
import akka.actor.ActorSystem;
import akka.dispatch.Futures;
import akka.http.javadsl.model.ws.Message;
import akka.http.javadsl.model.ws.TextMessage;
import akka.http.javadsl.model.ws.WebsocketRequest;
import akka.japi.function.Function;
import akka.stream.ActorMaterializer;
import akka.stream.Materializer;
import akka.stream.javadsl.Flow;
import akka.stream.javadsl.Keep;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.duration.FiniteDuration;
import scala.runtime.BoxedUnit;
import java.util.Arrays;
import java.util.List;
public class WSEchoTestClientApp {
private static final Function<Message, String> messageStringifier = new Function<Message, String>() {
@Override
public String apply(Message msg) throws Exception {
if (msg.isText() && msg.asTextMessage().isStrict())
return msg.asTextMessage().getStrictText();
else
throw new IllegalArgumentException("Unexpected message "+msg);
}
};
public static void main(String[] args) throws Exception {
ActorSystem system = ActorSystem.create();
try {
final Materializer materializer = ActorMaterializer.create(system);
final Future<Message> ignoredMessage = Futures.successful((Message) TextMessage.create("blub"));
final Future<Message> delayedCompletion =
akka.pattern.Patterns.after(
FiniteDuration.apply(1, "second"),
system.scheduler(),
system.dispatcher(),
ignoredMessage);
Source<Message, BoxedUnit> echoSource =
Source.from(Arrays.<Message>asList(
TextMessage.create("abc"),
TextMessage.create("def"),
TextMessage.create("ghi")
)).concat(Source.from(delayedCompletion).drop(1));
Sink<Message, Future<List<String>>> echoSink =
Flow.of(Message.class)
.map(messageStringifier)
.grouped(1000)
.toMat(Sink.<List<String>>head(), Keep.<BoxedUnit, Future<List<String>>>right());
Flow<Message, Message, Future<List<String>>> echoClient =
Flow.wrap(echoSink, echoSource, Keep.<Future<List<String>>, BoxedUnit>left());
Future<List<String>> result =
Http.get(system).singleWebsocketRequest(
WebsocketRequest.create("ws://echo.websocket.org"),
echoClient,
materializer
).second();
List<String> messages = Await.result(result, FiniteDuration.apply(10, "second"));
System.out.println("Collected " + messages.size() + " messages:");
for (String msg: messages)
System.out.println(msg);
} finally {
system.shutdown();
}
}
}