diff --git a/akka-http-core/src/main/scala/akka/http/javadsl/Http.scala b/akka-http-core/src/main/scala/akka/http/javadsl/Http.scala index 632c1fddca..869bd6a9b6 100644 --- a/akka-http-core/src/main/scala/akka/http/javadsl/Http.scala +++ b/akka-http-core/src/main/scala/akka/http/javadsl/Http.scala @@ -7,6 +7,8 @@ package akka.http.javadsl import java.lang.{ Iterable ⇒ JIterable } import java.net.InetSocketAddress import akka.http.impl.util.JavaMapping +import akka.http.javadsl.model.ws._ +import akka.stream import akka.stream.io.{ SslTlsInbound, SslTlsOutbound } import scala.language.implicitConversions @@ -496,6 +498,93 @@ class Http(system: ExtendedActorSystem) extends akka.actor.Extension { log: LoggingAdapter, materializer: Materializer): Future[HttpResponse] = delegate.singleRequest(request.asScala, settings, httpsContext, log)(materializer) + /** + * Constructs a Websocket [[BidiFlow]]. + * + * The layer is not reusable and must only be materialized once. + */ + def websocketClientLayer(request: WebsocketRequest): BidiFlow[Message, SslTlsOutbound, SslTlsInbound, Message, Future[WebsocketUpgradeResponse]] = + adaptWsBidiFlow(delegate.websocketClientLayer(request.asScala)) + + /** + * Constructs a Websocket [[BidiFlow]] using the configured default [[ClientConnectionSettings]], + * configured using the `akka.http.client` config section. + * + * The layer is not reusable and must only be materialized once. + */ + def websocketClientLayer(request: WebsocketRequest, + settings: ClientConnectionSettings): BidiFlow[Message, SslTlsOutbound, SslTlsInbound, Message, Future[WebsocketUpgradeResponse]] = + adaptWsBidiFlow(delegate.websocketClientLayer(request.asScala, settings)) + + /** + * Constructs a Websocket [[BidiFlow]] using the configured default [[ClientConnectionSettings]], + * configured using the `akka.http.client` config section. + * + * The layer is not reusable and must only be materialized once. + */ + def websocketClientLayer(request: WebsocketRequest, + settings: ClientConnectionSettings, + log: LoggingAdapter): BidiFlow[Message, SslTlsOutbound, SslTlsInbound, Message, Future[WebsocketUpgradeResponse]] = + adaptWsBidiFlow(delegate.websocketClientLayer(request.asScala, settings, log)) + + /** + * Constructs a flow that once materialized establishes a Websocket connection to the given Uri. + * + * The layer is not reusable and must only be materialized once. + */ + def websocketClientFlow(request: WebsocketRequest): Flow[Message, Message, Future[WebsocketUpgradeResponse]] = + adaptWsFlow { + delegate.websocketClientFlow(request.asScala) + } + + /** + * Constructs a flow that once materialized establishes a Websocket connection to the given Uri. + * + * The layer is not reusable and must only be materialized once. + */ + def websocketClientFlow(request: WebsocketRequest, + localAddress: Option[InetSocketAddress], + settings: ClientConnectionSettings, + httpsContext: Option[HttpsContext], + log: LoggingAdapter): Flow[Message, Message, Future[WebsocketUpgradeResponse]] = + adaptWsFlow { + delegate.websocketClientFlow(request.asScala, localAddress, settings, httpsContext, log) + } + + /** + * Runs a single Websocket conversation given a Uri and a flow that represents the client side of the + * Websocket conversation. + */ + def singleWebsocketRequest[T](request: WebsocketRequest, + clientFlow: Flow[Message, Message, T], + materializer: Materializer): Pair[Future[WebsocketUpgradeResponse], T] = + adaptWsResultTuple { + delegate.singleWebsocketRequest( + request.asScala, + adaptWsFlow[T](clientFlow))(materializer) + } + + /** + * Runs a single Websocket conversation given a Uri and a flow that represents the client side of the + * Websocket conversation. + */ + def singleWebsocketRequest[T](request: WebsocketRequest, + clientFlow: Flow[Message, Message, T], + localAddress: Option[InetSocketAddress], + settings: ClientConnectionSettings, + httpsContext: Option[HttpsContext], + log: LoggingAdapter, + materializer: Materializer): Pair[Future[WebsocketUpgradeResponse], T] = + adaptWsResultTuple { + delegate.singleWebsocketRequest( + request.asScala, + adaptWsFlow[T](clientFlow), + localAddress, + settings, + httpsContext, + log)(materializer) + } + /** * Triggers an orderly shutdown of all host connections pools currently maintained by the [[ActorSystem]]. * The returned future is completed when all pools that were live at the time of this method call @@ -517,7 +606,7 @@ class Http(system: ExtendedActorSystem) extends akka.actor.Extension { def setDefaultClientHttpsContext(context: HttpsContext): Unit = delegate.setDefaultClientHttpsContext(context.asInstanceOf[akka.http.scaladsl.HttpsContext]) - private def adaptTupleFlow[T, Mat](scalaFlow: akka.stream.scaladsl.Flow[(scaladsl.model.HttpRequest, T), (Try[scaladsl.model.HttpResponse], T), Mat]): Flow[Pair[HttpRequest, T], Pair[Try[HttpResponse], T], Mat] = { + private def adaptTupleFlow[T, Mat](scalaFlow: stream.scaladsl.Flow[(scaladsl.model.HttpRequest, T), (Try[scaladsl.model.HttpResponse], T), Mat]): Flow[Pair[HttpRequest, T], Pair[Try[HttpResponse], T], Mat] = { implicit val _ = JavaMapping.identity[T] JavaMapping.toJava(scalaFlow)(JavaMapping.flowMapping[Pair[HttpRequest, T], (scaladsl.model.HttpRequest, T), Pair[Try[HttpResponse], T], (Try[scaladsl.model.HttpResponse], T), Mat]) } @@ -531,4 +620,25 @@ class Http(system: ExtendedActorSystem) extends akka.actor.Extension { new BidiFlow( JavaMapping.adapterBidiFlow[HttpRequest, sm.HttpRequest, sm.HttpResponse, HttpResponse] .atop(clientLayer)) + + private def adaptWsBidiFlow(wsLayer: scaladsl.Http.WebsocketClientLayer): BidiFlow[Message, SslTlsOutbound, SslTlsInbound, Message, Future[WebsocketUpgradeResponse]] = + new BidiFlow( + JavaMapping.adapterBidiFlow[Message, sm.ws.Message, sm.ws.Message, Message] + .atopMat(wsLayer)((_, s) ⇒ adaptWsUpgradeResponse(s))) + + private def adaptWsFlow(wsLayer: stream.scaladsl.Flow[sm.ws.Message, sm.ws.Message, Future[scaladsl.Http.WebsocketUpgradeResponse]]): Flow[Message, Message, Future[WebsocketUpgradeResponse]] = + Flow.adapt(JavaMapping.adapterBidiFlow[Message, sm.ws.Message, sm.ws.Message, Message].joinMat(wsLayer)(Keep.right).mapMaterializedValue(adaptWsUpgradeResponse _)) + + private def adaptWsFlow[Mat](javaFlow: Flow[Message, Message, Mat]): stream.scaladsl.Flow[scaladsl.model.ws.Message, scaladsl.model.ws.Message, Mat] = + stream.scaladsl.Flow[scaladsl.model.ws.Message] + .map(Message.adapt) + .viaMat(javaFlow.asScala)(Keep.right) + .map(_.asScala) + + private def adaptWsResultTuple[T](result: (Future[scaladsl.Http.WebsocketUpgradeResponse], T)): Pair[Future[WebsocketUpgradeResponse], T] = + result match { + case (fut, tMat) ⇒ Pair(adaptWsUpgradeResponse(fut), tMat) + } + private def adaptWsUpgradeResponse(responseFuture: Future[scaladsl.Http.WebsocketUpgradeResponse]): Future[WebsocketUpgradeResponse] = + responseFuture.map(WebsocketUpgradeResponse.adapt)(system.dispatcher) } diff --git a/akka-http-core/src/main/scala/akka/http/javadsl/model/ws/WebsocketRequest.scala b/akka-http-core/src/main/scala/akka/http/javadsl/model/ws/WebsocketRequest.scala new file mode 100644 index 0000000000..d895cb6e56 --- /dev/null +++ b/akka-http-core/src/main/scala/akka/http/javadsl/model/ws/WebsocketRequest.scala @@ -0,0 +1,61 @@ +/* + * Copyright (C) 2009-2015 Typesafe Inc. + */ + +package akka.http.javadsl.model.ws + +import akka.http.javadsl.model.{ Uri, HttpHeader } +import akka.http.scaladsl.model.ws.{ WebsocketRequest ⇒ ScalaWebsocketRequest } + +/** + * Represents a Websocket request. Use `WebsocketRequest.create` to create a request + * for a target URI and then use `addHeader` or `requestSubprotocol` to set optional + * details. + */ +abstract class WebsocketRequest { + /** + * Return a copy of this request that contains the given additional header. + */ + def addHeader(header: HttpHeader): WebsocketRequest + + /** + * Return a copy of this request that will require that the server uses the + * given Websocket subprotocol. + */ + def requestSubprotocol(subprotocol: String): WebsocketRequest + + def asScala: ScalaWebsocketRequest +} +object WebsocketRequest { + import akka.http.impl.util.JavaMapping.Implicits._ + + /** + * Creates a WebsocketRequest to a target URI. Use the methods on `WebsocketRequest` + * to specify further details. + */ + def create(uri: Uri): WebsocketRequest = + wrap(ScalaWebsocketRequest(uri.asScala)) + + /** + * Creates a WebsocketRequest to a target URI. Use the methods on `WebsocketRequest` + * to specify further details. + */ + def create(uriString: String): WebsocketRequest = + create(Uri.create(uriString)) + + /** + * Wraps a Scala version of WebsocketRequest. + */ + def wrap(scalaRequest: ScalaWebsocketRequest): WebsocketRequest = + new WebsocketRequest { + def addHeader(header: HttpHeader): WebsocketRequest = + transform(s ⇒ s.copy(extraHeaders = s.extraHeaders :+ header.asScala)) + def requestSubprotocol(subprotocol: String): WebsocketRequest = + transform(_.copy(subprotocol = Some(subprotocol))) + + def asScala: ScalaWebsocketRequest = scalaRequest + + def transform(f: ScalaWebsocketRequest ⇒ ScalaWebsocketRequest): WebsocketRequest = + wrap(f(asScala)) + } +} diff --git a/akka-http-core/src/main/scala/akka/http/javadsl/model/ws/WebsocketUpgradeResponse.scala b/akka-http-core/src/main/scala/akka/http/javadsl/model/ws/WebsocketUpgradeResponse.scala new file mode 100644 index 0000000000..f6490d21a0 --- /dev/null +++ b/akka-http-core/src/main/scala/akka/http/javadsl/model/ws/WebsocketUpgradeResponse.scala @@ -0,0 +1,58 @@ +/* + * Copyright (C) 2009-2015 Typesafe Inc. + */ + +package akka.http.javadsl.model.ws + +import akka.http.javadsl.model.HttpResponse +import akka.http.scaladsl +import akka.http.scaladsl.Http.{ InvalidUpgradeResponse, ValidUpgrade } +import akka.japi.Option + +/** + * Represents an upgrade response for a Websocket upgrade request. Can either be valid, in which + * case the `chosenSubprotocol` method is valid, or if invalid, the `invalidationReason` method + * can be used to find out why the upgrade failed. + */ +trait WebsocketUpgradeResponse { + def isValid: Boolean + + /** + * Returns the response object as received from the server for further inspection. + */ + def response: HttpResponse + + /** + * If valid, returns `Some(subprotocol)` (if any was requested), or `None` if none was + * chosen or offered. + */ + def chosenSubprotocol: Option[String] + + /** + * If invalid, the reason why the server's upgrade response could not be accepted. + */ + def invalidationReason: String +} + +object WebsocketUpgradeResponse { + import akka.http.impl.util.JavaMapping.Implicits._ + def adapt(scalaResponse: scaladsl.Http.WebsocketUpgradeResponse): WebsocketUpgradeResponse = + scalaResponse match { + case ValidUpgrade(response, chosen) ⇒ + new WebsocketUpgradeResponse { + def isValid: Boolean = true + def response: HttpResponse = response + def chosenSubprotocol: Option[String] = chosen.asJava + def invalidationReason: String = + throw new UnsupportedOperationException("invalidationReason must not be called for valid response") + } + case InvalidUpgradeResponse(response, cause) ⇒ + new WebsocketUpgradeResponse { + def isValid: Boolean = false + def response: HttpResponse = response + def chosenSubprotocol: Option[String] = throw new UnsupportedOperationException("chosenSubprotocol must not be called for valid response") + def invalidationReason: String = cause + } + } + +} \ No newline at end of file diff --git a/akka-http-core/src/test/java/akka/http/javadsl/WSEchoTestClientApp.java b/akka-http-core/src/test/java/akka/http/javadsl/WSEchoTestClientApp.java new file mode 100644 index 0000000000..78d0e2cd30 --- /dev/null +++ b/akka-http-core/src/test/java/akka/http/javadsl/WSEchoTestClientApp.java @@ -0,0 +1,83 @@ +/* + * Copyright (C) 2009-2015 Typesafe Inc. + */ + +package akka.http.javadsl; + +import akka.actor.ActorSystem; +import akka.dispatch.Futures; +import akka.http.javadsl.model.ws.Message; +import akka.http.javadsl.model.ws.TextMessage; +import akka.http.javadsl.model.ws.WebsocketRequest; +import akka.japi.function.Function; +import akka.stream.ActorMaterializer; +import akka.stream.Materializer; +import akka.stream.javadsl.Flow; +import akka.stream.javadsl.Keep; +import akka.stream.javadsl.Sink; +import akka.stream.javadsl.Source; +import scala.concurrent.Await; +import scala.concurrent.Future; +import scala.concurrent.duration.FiniteDuration; +import scala.runtime.BoxedUnit; + +import java.util.Arrays; +import java.util.List; + +public class WSEchoTestClientApp { + private static final Function messageStringifier = new Function() { + @Override + public String apply(Message msg) throws Exception { + if (msg.isText() && msg.asTextMessage().isStrict()) + return msg.asTextMessage().getStrictText(); + else + throw new IllegalArgumentException("Unexpected message "+msg); + } + }; + + public static void main(String[] args) throws Exception { + ActorSystem system = ActorSystem.create(); + + try { + final Materializer materializer = ActorMaterializer.create(system); + + final Future ignoredMessage = Futures.successful((Message) TextMessage.create("blub")); + final Future delayedCompletion = + akka.pattern.Patterns.after( + FiniteDuration.apply(1, "second"), + system.scheduler(), + system.dispatcher(), + ignoredMessage); + + Source echoSource = + Source.from(Arrays.asList( + TextMessage.create("abc"), + TextMessage.create("def"), + TextMessage.create("ghi") + )).concat(Source.from(delayedCompletion).drop(1)); + + Sink>> echoSink = + Flow.of(Message.class) + .map(messageStringifier) + .grouped(1000) + .toMat(Sink.>head(), Keep.>>right()); + + Flow>> echoClient = + Flow.wrap(echoSink, echoSource, Keep.>, BoxedUnit>left()); + + Future> result = + Http.get(system).singleWebsocketRequest( + WebsocketRequest.create("ws://echo.websocket.org"), + echoClient, + materializer + ).second(); + + List messages = Await.result(result, FiniteDuration.apply(10, "second")); + System.out.println("Collected " + messages.size() + " messages:"); + for (String msg: messages) + System.out.println(msg); + } finally { + system.shutdown(); + } + } +}