diff --git a/akka-docs/rst/scala/code/docs/http/scaladsl/server/directives/JsonStreamingExamplesSpec.scala b/akka-docs/rst/scala/code/docs/http/scaladsl/server/directives/JsonStreamingExamplesSpec.scala index 6a6fffe86f..b354504829 100644 --- a/akka-docs/rst/scala/code/docs/http/scaladsl/server/directives/JsonStreamingExamplesSpec.scala +++ b/akka-docs/rst/scala/code/docs/http/scaladsl/server/directives/JsonStreamingExamplesSpec.scala @@ -105,7 +105,7 @@ class JsonStreamingExamplesSpec extends RoutingSpec { val route = path("metrics") { // [4] extract Source[Measurement, _] - entity(stream[Measurement]) { measurements => + entity(asSourceOf[Measurement]) { measurements => println("measurements = " + measurements) val measurementsSubmitted: Future[Int] = measurements diff --git a/akka-http-core/src/main/java/akka/http/javadsl/model/ContentTypes.java b/akka-http-core/src/main/java/akka/http/javadsl/model/ContentTypes.java index 61b3a1c728..25c403ee63 100644 --- a/akka-http-core/src/main/java/akka/http/javadsl/model/ContentTypes.java +++ b/akka-http-core/src/main/java/akka/http/javadsl/model/ContentTypes.java @@ -25,6 +25,9 @@ public final class ContentTypes { public static final ContentType.WithCharset TEXT_XML_UTF8 = akka.http.scaladsl.model.ContentTypes.text$divxml$u0028UTF$minus8$u0029(); + public static final ContentType.WithCharset TEXT_CSV_UTF8 = + akka.http.scaladsl.model.ContentTypes.text$divcsv$u0028UTF$minus8$u0029(); + public static ContentType.Binary create(MediaType.Binary mediaType) { return ContentType$.MODULE$.apply((akka.http.scaladsl.model.MediaType.Binary) mediaType); } diff --git a/akka-http-tests/src/test/java/akka/http/javadsl/server/JavaTestServer.java b/akka-http-tests/src/test/java/akka/http/javadsl/server/JavaTestServer.java index 33f2e98664..65ca1cec44 100644 --- a/akka-http-tests/src/test/java/akka/http/javadsl/server/JavaTestServer.java +++ b/akka-http-tests/src/test/java/akka/http/javadsl/server/JavaTestServer.java @@ -3,15 +3,20 @@ */ package akka.http.javadsl.server; +import akka.NotUsed; import akka.actor.ActorSystem; import akka.http.javadsl.ConnectHttp; import akka.http.javadsl.Http; import akka.http.javadsl.ServerBinding; +import akka.http.javadsl.marshallers.jackson.Jackson; +import akka.http.javadsl.model.HttpEntity; import akka.http.javadsl.model.HttpRequest; import akka.http.javadsl.model.HttpResponse; import akka.http.javadsl.model.StatusCodes; +import akka.http.javadsl.common.JsonSourceRenderingModes; import akka.stream.ActorMaterializer; import akka.stream.javadsl.Flow; +import akka.stream.javadsl.Source; import scala.concurrent.duration.Duration; import scala.runtime.BoxedUnit; @@ -55,18 +60,41 @@ public class JavaTestServer extends AllDirectives { // or import static Directiv ); final Route crash = path("crash", () -> - path("scala", () -> completeOKWithFutureString(akka.dispatch.Futures.failed(new Exception("Boom!")))).orElse( - path("java", () -> completeOKWithFutureString(CompletableFuture.supplyAsync(() -> { throw new RuntimeException("Boom!"); })))) + path("scala", () -> completeOKWithFutureString(akka.dispatch.Futures.failed(new Exception("Boom!")))).orElse( + path("java", () -> completeOKWithFutureString(CompletableFuture.supplyAsync(() -> { throw new RuntimeException("Boom!"); })))) ); + final Unmarshaller JavaTweets = Jackson.unmarshaller(JavaTweet.class); + final Route tweets = path("tweets", () -> + get(() -> + parameter(StringUnmarshallers.INTEGER, "n", n -> { + final Source tws = Source.repeat(new JavaTweet("Hello World!")).take(n); + return completeOKWithSource(tws, Jackson.marshaller(), JsonSourceRenderingModes.arrayCompact()); + }) + ).orElse( + post(() -> + extractMaterializer(mat -> + entityasSourceOf(JavaTweets, null, sourceOfTweets -> { + final CompletionStage tweetsCount = sourceOfTweets.runFold(0, (acc, tweet) -> acc + 1, mat); + return onComplete(tweetsCount, c -> complete("Total number of tweets: " + c)); + }) + ) + )) + ); + final Route inner = path("inner", () -> getFromResourceDirectory("someDir") ); - return get(() -> - index.orElse(secure).orElse(ping).orElse(crash).orElse(inner).orElse(requestTimeout) - ); + return index + .orElse(secure) + .orElse(ping) + .orElse(crash) + .orElse(inner) + .orElse(requestTimeout) + .orElse(tweets) + ; } private void silentSleep(int millis) { @@ -113,7 +141,7 @@ public class JavaTestServer extends AllDirectives { // or import static Directiv final Flow flow = createRoute().flow(system, mat); final CompletionStage binding = - Http.get(system).bindAndHandle(flow, ConnectHttp.toHost("127.0.0.1"), mat); + Http.get(system).bindAndHandle(flow, ConnectHttp.toHost("127.0.0.1", 8080), mat); System.console().readLine("Press [ENTER] to quit..."); shutdown(binding); @@ -131,4 +159,21 @@ public class JavaTestServer extends AllDirectives { // or import static Directiv } }); } + + private static final class JavaTweet { + private String message; + + public JavaTweet(String message) { + this.message = message; + } + + public void setMessage(String message) { + this.message = message; + } + + public String getMessage() { + return message; + } + + } } diff --git a/akka-http-tests/src/test/scala/akka/http/scaladsl/server/IntegrationRoutingSpec.scala b/akka-http-tests/src/test/scala/akka/http/scaladsl/server/IntegrationRoutingSpec.scala index a931209d7f..223b6b2d9b 100644 --- a/akka-http-tests/src/test/scala/akka/http/scaladsl/server/IntegrationRoutingSpec.scala +++ b/akka-http-tests/src/test/scala/akka/http/scaladsl/server/IntegrationRoutingSpec.scala @@ -18,6 +18,7 @@ import scala.concurrent.Await private[akka] trait IntegrationRoutingSpec extends WordSpecLike with Matchers with BeforeAndAfterAll with Directives with RequestBuilding with ScalaFutures with IntegrationPatience { + import IntegrationRoutingSpec._ implicit val system = ActorSystem(AkkaSpec.getCallerName(getClass)) implicit val mat = ActorMaterializer() @@ -31,8 +32,6 @@ private[akka] trait IntegrationRoutingSpec extends WordSpecLike with Matchers wi def ~!>(route: Route) = new Prepped(request, route) } - final case class Prepped(request: HttpRequest, route: Route) - implicit class Checking(p: Prepped) { def ~!>(checking: HttpResponse ⇒ Unit) = { val (_, host, port) = TestUtils.temporaryServerHostnameAndPort() @@ -47,3 +46,7 @@ private[akka] trait IntegrationRoutingSpec extends WordSpecLike with Matchers wi } } + +object IntegrationRoutingSpec { + final case class Prepped(request: HttpRequest, route: Route) +} diff --git a/akka-http-tests/src/test/scala/akka/http/scaladsl/server/TestServer.scala b/akka-http-tests/src/test/scala/akka/http/scaladsl/server/TestServer.scala index ecca3305dc..b4901750d5 100644 --- a/akka-http-tests/src/test/scala/akka/http/scaladsl/server/TestServer.scala +++ b/akka-http-tests/src/test/scala/akka/http/scaladsl/server/TestServer.scala @@ -4,14 +4,18 @@ package akka.http.scaladsl.server +import akka.NotUsed import akka.http.scaladsl.marshallers.xml.ScalaXmlSupport -import akka.http.scaladsl.model.{ StatusCodes, HttpResponse } +import akka.http.scaladsl.model.{ HttpResponse, StatusCodes } import akka.http.scaladsl.server.directives.Credentials -import com.typesafe.config.{ ConfigFactory, Config } +import com.typesafe.config.{ Config, ConfigFactory } import akka.actor.ActorSystem import akka.stream._ import akka.stream.scaladsl._ import akka.http.scaladsl.Http +import akka.http.scaladsl.common.{ FramingWithContentType, JsonSourceRenderingModes, SourceRenderingMode } +import akka.http.scaladsl.marshalling.ToResponseMarshallable + import scala.concurrent.duration._ import scala.io.StdIn @@ -21,10 +25,26 @@ object TestServer extends App { akka.log-dead-letters = off akka.stream.materializer.debug.fuzzing-mode = off """) + implicit val system = ActorSystem("ServerTest", testConf) import system.dispatcher implicit val materializer = ActorMaterializer() + // --------- json streaming --------- + import spray.json.DefaultJsonProtocol._ + import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._ + final case class Tweet(message: String) + implicit val tweetFormat = jsonFormat1(Tweet) + + // FIXME: Need to be able to support composive framing with content type (!!!!!!!) + import akka.http.scaladsl.server.EntityStreamingSupport._ + /* override if extending EntityStreamingSupport */ + implicit val incomingEntityStreamFraming: FramingWithContentType = bracketCountingJsonFraming(128) + /* override if extending EntityStreamingSupport */ + implicit val outgoingEntityStreamRendering: SourceRenderingMode = JsonSourceRenderingModes.LineByLine + + // --------- end of json streaming --------- + import ScalaXmlSupport._ import Directives._ @@ -32,7 +52,8 @@ object TestServer extends App { case p @ Credentials.Provided(name) if p.verify(name + "-password") ⇒ name } - val bindingFuture = Http().bindAndHandle({ + // format: OFF + val routes = { get { path("") { withRequestTimeout(1.milli, _ ⇒ HttpResponse( @@ -42,19 +63,44 @@ object TestServer extends App { complete(index) } } ~ - path("secure") { - authenticateBasicPF("My very secure site", auth) { user ⇒ - complete(Hello { user }. Access has been granted!) - } - } ~ - path("ping") { - complete("PONG!") - } ~ - path("crash") { - complete(sys.error("BOOM!")) + path("secure") { + authenticateBasicPF("My very secure site", auth) { user ⇒ + complete( + Hello + + {user} + + . Access has been granted! + ) } - } ~ pathPrefix("inner")(getFromResourceDirectory("someDir")) - }, interface = "localhost", port = 8080) + } ~ + path("ping") { + complete("PONG!") + } ~ + path("crash") { + complete(sys.error("BOOM!")) + } ~ + path("tweet") { + complete(Tweet("Hello, world!")) + } ~ + (path("tweets") & parameter('n.as[Int])) { n => + get { + val tweets = Source.repeat(Tweet("Hello, world!")).take(n) + complete(ToResponseMarshallable(tweets)) + } ~ + post { + entity(asSourceOf[Tweet]) { tweets ⇒ + // entity(asSourceOf[Tweet](bracketCountingJsonFraming(1024))) { tweets: Source[Tweet, NotUsed] ⇒ + complete(s"Total tweets received: " + tweets.runFold(0)({ case (acc, t) => acc + 1 })) + } + } + } + } ~ + pathPrefix("inner")(getFromResourceDirectory("someDir")) + } + // format: ON + + val bindingFuture = Http().bindAndHandle(routes, interface = "0.0.0.0", port = 8080) println(s"Server online at http://localhost:8080/\nPress RETURN to stop...") StdIn.readLine() diff --git a/akka-http/src/main/scala/akka/http/javadsl/common/CsvSourceRenderingMode.scala b/akka-http/src/main/scala/akka/http/javadsl/common/CsvSourceRenderingMode.scala new file mode 100644 index 0000000000..d755dd6bf2 --- /dev/null +++ b/akka-http/src/main/scala/akka/http/javadsl/common/CsvSourceRenderingMode.scala @@ -0,0 +1,41 @@ +/* + * Copyright (C) 2016 Lightbend Inc. + */ + +package akka.http.javadsl.common + +import akka.http.javadsl.model.ContentType.WithCharset +import akka.http.javadsl.model.ContentTypes +import akka.util.ByteString + +/** + * Specialised rendering mode for streaming elements as CSV. + */ +trait CsvSourceRenderingMode extends SourceRenderingMode { + override val contentType: WithCharset = + ContentTypes.TEXT_CSV_UTF8 +} + +object CsvSourceRenderingModes { + + /** + * Render sequence of values as row-by-row ('\n' separated) series of values. + */ + val create: CsvSourceRenderingMode = + new CsvSourceRenderingMode { + override def between: ByteString = ByteString("\n") + override def end: ByteString = ByteString.empty + override def start: ByteString = ByteString.empty + } + + /** + * Render sequence of values as row-by-row (with custom row separator, + * e.g. if you need to use '\r\n' instead of '\n') series of values. + */ + def custom(rowSeparator: String): CsvSourceRenderingMode = + new CsvSourceRenderingMode { + override def between: ByteString = ByteString(rowSeparator) + override def end: ByteString = ByteString.empty + override def start: ByteString = ByteString.empty + } +} diff --git a/akka-http/src/main/scala/akka/http/javadsl/common/FramingWithContentType.scala b/akka-http/src/main/scala/akka/http/javadsl/common/FramingWithContentType.scala new file mode 100644 index 0000000000..127c9bf890 --- /dev/null +++ b/akka-http/src/main/scala/akka/http/javadsl/common/FramingWithContentType.scala @@ -0,0 +1,24 @@ +/* + * Copyright (C) 2016 Lightbend Inc. + */ + +package akka.http.javadsl.common + +import akka.http.javadsl.model.ContentTypeRange +import akka.stream.javadsl.Framing + +trait FramingWithContentType extends Framing { self ⇒ + import akka.http.impl.util.JavaMapping.Implicits._ + + override def asScala: akka.http.scaladsl.common.FramingWithContentType = + this match { + case f: akka.http.scaladsl.common.FramingWithContentType ⇒ f + case _ ⇒ new akka.http.scaladsl.common.FramingWithContentType { + override def flow = self.getFlow.asScala + override def supported = self.supported.asScala + } + } + + def supported: ContentTypeRange + def matches(ct: akka.http.javadsl.model.ContentType): Boolean = supported.matches(ct) +} diff --git a/akka-http/src/main/scala/akka/http/javadsl/common/JsonSourceRenderingMode.scala b/akka-http/src/main/scala/akka/http/javadsl/common/JsonSourceRenderingMode.scala new file mode 100644 index 0000000000..b5fe32fdf9 --- /dev/null +++ b/akka-http/src/main/scala/akka/http/javadsl/common/JsonSourceRenderingMode.scala @@ -0,0 +1,101 @@ +/* + * Copyright (C) 2016 Lightbend Inc. + */ + +package akka.http.javadsl.common + +import akka.http.javadsl.model.{ ContentType, ContentTypes } + +/** + * Specialised rendering mode for streaming elements as JSON. + * + * See also: JSON Streaming on Wikipedia. + */ +trait JsonSourceRenderingMode extends SourceRenderingMode { + override val contentType: ContentType.WithFixedCharset = + ContentTypes.APPLICATION_JSON +} + +/** + * Provides default JSON rendering modes. + */ +object JsonSourceRenderingModes { + + /** + * Most compact rendering mode. + * It does not intersperse any separator between the signalled elements. + * + * It can be used with [[akka.stream.javadsl.JsonFraming.bracketCounting]]. + * + * {{{ + * {"id":42}{"id":43}{"id":44} + * }}} + */ + val compact = akka.http.scaladsl.common.JsonSourceRenderingModes.Compact + + /** + * Simple rendering mode, similar to [[compact]] however interspersing elements with a `\n` character. + * + * {{{ + * {"id":42},{"id":43},{"id":44} + * }}} + */ + val compactCommaSeparated = akka.http.scaladsl.common.JsonSourceRenderingModes.CompactCommaSeparated + + /** + * Rendering mode useful when the receiving end expects a valid JSON Array. + * It can be useful when the client wants to detect when the stream has been successfully received in-full, + * which it can determine by seeing the terminating `]` character. + * + * The framing's terminal `]` will ONLY be emitted if the stream has completed successfully, + * in other words - the stream has been emitted completely, without errors occuring before the final element has been signaled. + * + * {{{ + * [{"id":42},{"id":43},{"id":44}] + * }}} + */ + val arrayCompact = akka.http.scaladsl.common.JsonSourceRenderingModes.ArrayCompact + + /** + * Rendering mode useful when the receiving end expects a valid JSON Array. + * It can be useful when the client wants to detect when the stream has been successfully received in-full, + * which it can determine by seeing the terminating `]` character. + * + * The framing's terminal `]` will ONLY be emitted if the stream has completed successfully, + * in other words - the stream has been emitted completely, without errors occuring before the final element has been signaled. + * + * {{{ + * [{"id":42}, + * {"id":43}, + * {"id":44}] + * }}} + */ + val arrayLineByLine = akka.http.scaladsl.common.JsonSourceRenderingModes.ArrayLineByLine + + /** + * Recommended rendering mode. + * + * It is a nice balance between valid and human-readable as well as resonably small size overhead (just the `\n` between elements). + * A good example of API's using this syntax is Twitter's Firehose (last verified at 1.1 version of that API). + * + * {{{ + * {"id":42} + * {"id":43} + * {"id":44} + * }}} + */ + val lineByLine = akka.http.scaladsl.common.JsonSourceRenderingModes.LineByLine + + /** + * Simple rendering mode interspersing each pair of elements with both `,\n`. + * Picking the [[lineByLine]] format may be preferable, as it is slightly simpler to parse - each line being a valid json object (no need to trim the comma). + * + * {{{ + * {"id":42}, + * {"id":43}, + * {"id":44} + * }}} + */ + val lineByLineCommaSeparated = akka.http.scaladsl.common.JsonSourceRenderingModes.LineByLineCommaSeparated + +} diff --git a/akka-http/src/main/scala/akka/http/javadsl/common/SourceRenderingMode.scala b/akka-http/src/main/scala/akka/http/javadsl/common/SourceRenderingMode.scala new file mode 100644 index 0000000000..5144f336f6 --- /dev/null +++ b/akka-http/src/main/scala/akka/http/javadsl/common/SourceRenderingMode.scala @@ -0,0 +1,22 @@ +/* + * Copyright (C) 2016 Lightbend Inc. + */ + +package akka.http.javadsl.common + +import akka.http.javadsl.model.ContentType +import akka.util.ByteString + +/** + * Defines how to render a [[akka.stream.javadsl.Source]] into a raw [[ByteString]] + * output. + * + * This can be used to render a source into an [[akka.http.scaladsl.model.HttpEntity]]. + */ +trait SourceRenderingMode { + def contentType: ContentType + + def start: ByteString + def between: ByteString + def end: ByteString +} diff --git a/akka-http/src/main/scala/akka/http/javadsl/marshalling/Marshaller.scala b/akka-http/src/main/scala/akka/http/javadsl/marshalling/Marshaller.scala index c1dc4ab681..dfd7388652 100644 --- a/akka-http/src/main/scala/akka/http/javadsl/marshalling/Marshaller.scala +++ b/akka-http/src/main/scala/akka/http/javadsl/marshalling/Marshaller.scala @@ -16,6 +16,7 @@ import akka.japi.Util import akka.util.ByteString import scala.concurrent.ExecutionContext +import scala.annotation.unchecked.uncheckedVariance import scala.language.implicitConversions object Marshaller { @@ -56,29 +57,29 @@ object Marshaller { // TODO make sure these are actually usable in a sane way def wrapEntity[A, C](f: function.BiFunction[ExecutionContext, C, A], m: Marshaller[A, RequestEntity], mediaType: MediaType): Marshaller[C, RequestEntity] = { - val scalaMarshaller = m.asScalaToEntityMarshaller + val scalaMarshaller = m.asScalaCastOutput fromScala(scalaMarshaller.wrapWithEC(mediaType.asScala) { ctx ⇒ c: C ⇒ f(ctx, c) }(ContentTypeOverrider.forEntity)) } def wrapEntity[A, C, E <: RequestEntity](f: function.Function[C, A], m: Marshaller[A, E], mediaType: MediaType): Marshaller[C, RequestEntity] = { - val scalaMarshaller = m.asScalaToEntityMarshaller + val scalaMarshaller = m.asScalaCastOutput fromScala(scalaMarshaller.wrap(mediaType.asScala)((in: C) ⇒ f.apply(in))(ContentTypeOverrider.forEntity)) } def entityToOKResponse[A](m: Marshaller[A, _ <: RequestEntity]): Marshaller[A, HttpResponse] = { - fromScala(marshalling.Marshaller.fromToEntityMarshaller[A]()(m.asScalaToEntityMarshaller)) + fromScala(marshalling.Marshaller.fromToEntityMarshaller[A]()(m.asScalaCastOutput)) } def entityToResponse[A, R <: RequestEntity](status: StatusCode, m: Marshaller[A, R]): Marshaller[A, HttpResponse] = { - fromScala(marshalling.Marshaller.fromToEntityMarshaller[A](status.asScala)(m.asScalaToEntityMarshaller)) + fromScala(marshalling.Marshaller.fromToEntityMarshaller[A](status.asScala)(m.asScalaCastOutput)) } def entityToResponse[A](status: StatusCode, headers: java.lang.Iterable[HttpHeader], m: Marshaller[A, _ <: RequestEntity]): Marshaller[A, HttpResponse] = { - fromScala(marshalling.Marshaller.fromToEntityMarshaller[A](status.asScala, Util.immutableSeq(headers).map(_.asScala))(m.asScalaToEntityMarshaller)) // TODO can we avoid the map() ? + fromScala(marshalling.Marshaller.fromToEntityMarshaller[A](status.asScala, Util.immutableSeq(headers).map(_.asScala))(m.asScalaCastOutput)) // TODO can we avoid the map() ? } def entityToOKResponse[A](headers: java.lang.Iterable[HttpHeader], m: Marshaller[A, _ <: RequestEntity]): Marshaller[A, HttpResponse] = { - fromScala(marshalling.Marshaller.fromToEntityMarshaller[A](headers = Util.immutableSeq(headers).map(_.asScala))(m.asScalaToEntityMarshaller)) // TODO avoid the map() + fromScala(marshalling.Marshaller.fromToEntityMarshaller[A](headers = Util.immutableSeq(headers).map(_.asScala))(m.asScalaCastOutput)) // TODO avoid the map() } // these are methods not varargs to avoid call site warning about unchecked type params @@ -140,13 +141,14 @@ object Marshaller { m.asScala.map(_.asScala) } -class Marshaller[A, B] private (implicit val asScala: marshalling.Marshaller[A, B]) { +class Marshaller[-A, +B] private (implicit val asScala: marshalling.Marshaller[A, B]) { import Marshaller.fromScala + /** INTERNAL API: involves unsafe cast (however is very fast) */ // TODO would be nice to not need this special case - def asScalaToEntityMarshaller[C]: marshalling.Marshaller[A, C] = asScala.asInstanceOf[marshalling.Marshaller[A, C]] + private[akka] def asScalaCastOutput[C]: marshalling.Marshaller[A, C] = asScala.asInstanceOf[marshalling.Marshaller[A, C]] - def map[C](f: function.Function[B, C]): Marshaller[A, C] = fromScala(asScala.map(f.apply)) + def map[C](f: function.Function[B @uncheckedVariance, C]): Marshaller[A, C] = fromScala(asScala.map(f.apply)) - def compose[C](f: function.Function[C, A]): Marshaller[C, B] = fromScala(asScala.compose(f.apply)) + def compose[C](f: function.Function[C, A @uncheckedVariance]): Marshaller[C, B] = fromScala(asScala.compose(f.apply)) } diff --git a/akka-http/src/main/scala/akka/http/javadsl/server/Directives.scala b/akka-http/src/main/scala/akka/http/javadsl/server/Directives.scala index 7bc2100a2d..2f966a1a7e 100644 --- a/akka-http/src/main/scala/akka/http/javadsl/server/Directives.scala +++ b/akka-http/src/main/scala/akka/http/javadsl/server/Directives.scala @@ -5,11 +5,11 @@ package akka.http.javadsl.server import akka.http.impl.util.JavaMapping -import akka.http.javadsl.server.directives.TimeoutDirectives +import akka.http.javadsl.server.directives.{ FramedEntityStreamingDirectives, TimeoutDirectives } import scala.annotation.varargs -abstract class AllDirectives extends TimeoutDirectives +abstract class AllDirectives extends FramedEntityStreamingDirectives /** * INTERNAL API diff --git a/akka-http/src/main/scala/akka/http/javadsl/server/JsonEntityStreaming.scala b/akka-http/src/main/scala/akka/http/javadsl/server/JsonEntityStreaming.scala new file mode 100644 index 0000000000..529ae2dea3 --- /dev/null +++ b/akka-http/src/main/scala/akka/http/javadsl/server/JsonEntityStreaming.scala @@ -0,0 +1,9 @@ +/* + * Copyright (C) 2009-2015 Typesafe Inc. + */ +package akka.http.javadsl.server + +class JsonEntityStreaming { + +} + diff --git a/akka-http/src/main/scala/akka/http/javadsl/server/RoutingJavaMapping.scala b/akka-http/src/main/scala/akka/http/javadsl/server/RoutingJavaMapping.scala index 1cb892cfd8..dda99cbb75 100644 --- a/akka-http/src/main/scala/akka/http/javadsl/server/RoutingJavaMapping.scala +++ b/akka-http/src/main/scala/akka/http/javadsl/server/RoutingJavaMapping.scala @@ -10,7 +10,9 @@ import akka.http.impl.util.JavaMapping._ import akka.http.impl.util._ import akka.http.{ javadsl, scaladsl } import akka.http.scaladsl.server.{ directives ⇒ sdirectives } +import akka.http.scaladsl.{ common ⇒ scommon } import akka.http.javadsl.server.{ directives ⇒ jdirectives } +import akka.http.javadsl.{ common ⇒ jcommon } import scala.collection.immutable /** @@ -43,6 +45,8 @@ private[http] object RoutingJavaMapping { } implicit object convertRouteResult extends Inherited[javadsl.server.RouteResult, scaladsl.server.RouteResult] + implicit object convertSourceRenderingMode extends Inherited[jcommon.SourceRenderingMode, scommon.SourceRenderingMode] + implicit object convertDirectoryRenderer extends Inherited[jdirectives.DirectoryRenderer, sdirectives.FileAndResourceDirectives.DirectoryRenderer] implicit object convertContentTypeResolver extends Inherited[jdirectives.ContentTypeResolver, sdirectives.ContentTypeResolver] implicit object convertDirectoryListing extends Inherited[jdirectives.DirectoryListing, sdirectives.DirectoryListing] diff --git a/akka-http/src/main/scala/akka/http/javadsl/server/directives/FramedEntityStreamingDirectives.scala b/akka-http/src/main/scala/akka/http/javadsl/server/directives/FramedEntityStreamingDirectives.scala index ec05f046e8..e39288a8b2 100644 --- a/akka-http/src/main/scala/akka/http/javadsl/server/directives/FramedEntityStreamingDirectives.scala +++ b/akka-http/src/main/scala/akka/http/javadsl/server/directives/FramedEntityStreamingDirectives.scala @@ -3,49 +3,87 @@ */ package akka.http.javadsl.server.directives -import akka.http.javadsl.model.{ContentType, HttpEntity} +import akka.http.javadsl.model.{ ContentType, HttpEntity } import akka.util.ByteString -import java.util.{List => JList, Map => JMap} +import java.util.{ List ⇒ JList, Map ⇒ JMap } import java.util.AbstractMap.SimpleImmutableEntry import java.util.Optional -import java.util.function.{Function => JFunction} +import java.util.function.{ Function ⇒ JFunction } import akka.NotUsed import scala.collection.JavaConverters._ import akka.http.impl.util.JavaMapping.Implicits._ -import akka.http.javadsl.server.directives.FramedEntityStreamingDirectives.SourceRenderingMode -import akka.http.javadsl.server.{Route, Unmarshaller} -import akka.http.scaladsl.marshalling.ToResponseMarshallable -import akka.http.scaladsl.server.{FramingWithContentType, Directives => D} -import akka.http.scaladsl.server.directives.ParameterDirectives._ +import akka.http.javadsl.common.{ FramingWithContentType, SourceRenderingMode } +import akka.http.javadsl.server.{ Marshaller, Route, Unmarshaller } +import akka.http.javadsl.model._ +import akka.http.scaladsl.marshalling.{ ToResponseMarshallable, ToResponseMarshaller } +import akka.http.scaladsl.server.{ Directives ⇒ D } +import akka.http.scaladsl.unmarshalling import akka.stream.javadsl.Source -import scala.compat.java8.OptionConverters - /** EXPERIMENTAL API */ -trait FramedEntityStreamingDirectives { - - def entityAsStream[T](clazz: Class[T], um: Unmarshaller[_ >: HttpEntity, T], framing: FramingWithContentType, - inner: java.util.function.Function[Source[T, NotUsed], Route]): Route = RouteAdapter { - D.entity[T](D.stream[T](um, framing)) { s => - +abstract class FramedEntityStreamingDirectives extends TimeoutDirectives { + // important import, as we implicitly resolve some marshallers inside the below directives + import akka.http.scaladsl.server.directives.FramedEntityStreamingDirectives._ + + @CorrespondsTo("asSourceOf") + def entityasSourceOf[T](um: Unmarshaller[HttpEntity, T], framing: FramingWithContentType, + inner: java.util.function.Function[Source[T, NotUsed], Route]): Route = RouteAdapter { + val sum = um.asScalaCastInput[akka.http.scaladsl.model.HttpEntity] + D.entity(D.asSourceOf[T](framing.asScala)(sum)) { s: akka.stream.scaladsl.Source[T, NotUsed] ⇒ + inner(s.asJava).delegate } - ??? } - - def completeWithSource[T](source: Source[T, Any], rendering: SourceRenderingMode): Route = RouteAdapter { + + @CorrespondsTo("asSourceOfAsync") + def entityAsSourceAsyncOf[T]( + parallelism: Int, + um: Unmarshaller[HttpEntity, T], framing: FramingWithContentType, + inner: java.util.function.Function[Source[T, NotUsed], Route]): Route = RouteAdapter { + val sum = um.asScalaCastInput[akka.http.scaladsl.model.HttpEntity] + D.entity(D.asSourceOfAsync[T](parallelism, framing.asScala)(sum)) { s: akka.stream.scaladsl.Source[T, NotUsed] ⇒ + inner(s.asJava).delegate + } + } + + @CorrespondsTo("asSourceOfAsyncUnordered") + def entityAsSourceAsyncUnorderedOf[T]( + parallelism: Int, + um: Unmarshaller[HttpEntity, T], framing: FramingWithContentType, + inner: java.util.function.Function[Source[T, NotUsed], Route]): Route = RouteAdapter { + val sum = um.asScalaCastInput[akka.http.scaladsl.model.HttpEntity] + D.entity(D.asSourceOfAsyncUnordered[T](parallelism, framing.asScala)(sum)) { s: akka.stream.scaladsl.Source[T, NotUsed] ⇒ + inner(s.asJava).delegate + } + } + + // implicit used internally, Java caller does not benefit or use it + @CorrespondsTo("complete") + def completeWithSource[T, M](implicit source: Source[T, M], m: Marshaller[T, ByteString], rendering: SourceRenderingMode): Route = RouteAdapter { + import akka.http.scaladsl.marshalling.PredefinedToResponseMarshallers._ + implicit val mm = _sourceMarshaller(m.map(ByteStringAsEntityFn), rendering) val response = ToResponseMarshallable(source) D.complete(response) } -} -object FramedEntityStreamingDirectives extends FramedEntityStreamingDirectives { - trait SourceRenderingMode { - def getContentType: ContentType + @CorrespondsTo("complete") + def completeOKWithSource[T, M](implicit source: Source[T, M], m: Marshaller[T, RequestEntity], rendering: SourceRenderingMode): Route = RouteAdapter { + implicit val mm = _sourceMarshaller[T, M](m, rendering) + val response = ToResponseMarshallable(source) + D.complete(response) + } - def start: ByteString - def between: ByteString - def end: ByteString + implicit private def _sourceMarshaller[T, M](implicit m: Marshaller[T, HttpEntity], rendering: SourceRenderingMode) = { + import akka.http.javadsl.server.RoutingJavaMapping._ + import akka.http.javadsl.server.RoutingJavaMapping.Implicits._ + val mm = m.asScalaCastOutput + D._sourceMarshaller[T, M](mm, rendering.asScala).compose({ h: akka.stream.javadsl.Source[T, M] ⇒ h.asScala }) + } + + private[this] val ByteStringAsEntityFn = new java.util.function.Function[ByteString, HttpEntity]() { + override def apply(bs: ByteString): HttpEntity = HttpEntities.create(bs) } } + +object FramedEntityStreamingDirectives extends FramedEntityStreamingDirectives diff --git a/akka-http/src/main/scala/akka/http/javadsl/server/directives/FutureDirectives.scala b/akka-http/src/main/scala/akka/http/javadsl/server/directives/FutureDirectives.scala index 7a73f294ef..dd48c4c187 100644 --- a/akka-http/src/main/scala/akka/http/javadsl/server/directives/FutureDirectives.scala +++ b/akka-http/src/main/scala/akka/http/javadsl/server/directives/FutureDirectives.scala @@ -30,6 +30,16 @@ abstract class FutureDirectives extends FormFieldDirectives { } } + /** + * "Unwraps" a `CompletionStage` and runs the inner route after future + * completion with the future's value as an extraction of type `Try`. + */ + def onComplete[T](cs: CompletionStage[T], inner: JFunction[Try[T], Route]) = RouteAdapter { + D.onComplete(cs.toScala.recover(unwrapCompletionException)) { value ⇒ + inner(value).delegate + } + } + /** * "Unwraps" a `CompletionStage[T]` and runs the inner route after future * completion with the future's value as an extraction of type `T` if diff --git a/akka-http/src/main/scala/akka/http/javadsl/unmarshalling/Unmarshaller.scala b/akka-http/src/main/scala/akka/http/javadsl/unmarshalling/Unmarshaller.scala index 8a7a0a7659..368ebe68c3 100644 --- a/akka-http/src/main/scala/akka/http/javadsl/unmarshalling/Unmarshaller.scala +++ b/akka-http/src/main/scala/akka/http/javadsl/unmarshalling/Unmarshaller.scala @@ -101,6 +101,9 @@ abstract class Unmarshaller[-A, B] extends UnmarshallerBase[A, B] { implicit def asScala: akka.http.scaladsl.unmarshalling.Unmarshaller[A, B] + /** INTERNAL API */ + private[akka] def asScalaCastInput[I]: unmarshalling.Unmarshaller[I, B] = asScala.asInstanceOf[unmarshalling.Unmarshaller[I, B]] + def unmarshall(a: A, ec: ExecutionContext, mat: Materializer): CompletionStage[B] = asScala.apply(a)(ec, mat).toJava /** diff --git a/akka-http/src/main/scala/akka/http/scaladsl/common/FramingWithContentType.scala b/akka-http/src/main/scala/akka/http/scaladsl/common/FramingWithContentType.scala new file mode 100644 index 0000000000..75743b674d --- /dev/null +++ b/akka-http/src/main/scala/akka/http/scaladsl/common/FramingWithContentType.scala @@ -0,0 +1,23 @@ +/* + * Copyright (C) 2016 Lightbend Inc. + */ + +package akka.http.scaladsl.common + +import akka.NotUsed +import akka.event.Logging +import akka.http.scaladsl.model.{ ContentType, ContentTypeRange } +import akka.stream.scaladsl.{ Flow, Framing } +import akka.util.ByteString + +/** + * Same as [[akka.stream.scaladsl.Framing]] but additionally can express which [[ContentType]] it supports, + * which can be used to reject routes if content type does not match used framing. + */ +abstract class FramingWithContentType extends akka.http.javadsl.common.FramingWithContentType with Framing { + def flow: Flow[ByteString, ByteString, NotUsed] + override def supported: ContentTypeRange + override def matches(ct: akka.http.javadsl.model.ContentType): Boolean = supported.matches(ct) + + override def toString = s"${Logging.simpleName(getClass)}($supported)" +} diff --git a/akka-http/src/main/scala/akka/http/scaladsl/common/JsonSourceRenderingMode.scala b/akka-http/src/main/scala/akka/http/scaladsl/common/JsonSourceRenderingMode.scala new file mode 100644 index 0000000000..52e3d1b754 --- /dev/null +++ b/akka-http/src/main/scala/akka/http/scaladsl/common/JsonSourceRenderingMode.scala @@ -0,0 +1,126 @@ +/* + * Copyright (C) 2016 Lightbend Inc. + */ + +package akka.http.scaladsl.common + +import akka.http.scaladsl.model.{ ContentType, ContentTypes } +import akka.util.ByteString + +/** + * Specialised rendering mode for streaming elements as JSON. + * + * See also: JSON Streaming on Wikipedia. + */ +trait JsonSourceRenderingMode extends akka.http.javadsl.common.JsonSourceRenderingMode with SourceRenderingMode { + override val contentType: ContentType.WithFixedCharset = + ContentTypes.`application/json` +} + +/** + * Provides default JSON rendering modes. + */ +object JsonSourceRenderingModes { + + /** + * Most compact rendering mode. + * It does not intersperse any separator between the signalled elements. + * + * It is the most compact form to render JSON and can be framed properly by using [[akka.stream.javadsl.JsonFraming.bracketCounting]]. + * + * {{{ + * {"id":42}{"id":43}{"id":44} + * }}} + */ + object Compact extends JsonSourceRenderingMode { + override val start: ByteString = ByteString.empty + override val between: ByteString = ByteString.empty + override val end: ByteString = ByteString.empty + } + + /** + * Simple rendering mode, similar to [[Compact]] however interspersing elements with a `\n` character. + * + * {{{ + * {"id":42},{"id":43},{"id":44} + * }}} + */ + object CompactCommaSeparated extends JsonSourceRenderingMode { + override val start: ByteString = ByteString.empty + override val between: ByteString = ByteString(",") + override val end: ByteString = ByteString.empty + } + + /** + * Rendering mode useful when the receiving end expects a valid JSON Array. + * It can be useful when the client wants to detect when the stream has been successfully received in-full, + * which it can determine by seeing the terminating `]` character. + * + * The framing's terminal `]` will ONLY be emitted if the stream has completed successfully, + * in other words - the stream has been emitted completely, without errors occuring before the final element has been signaled. + * + * {{{ + * [{"id":42},{"id":43},{"id":44}] + * }}} + */ + object ArrayCompact extends JsonSourceRenderingMode { + override val start: ByteString = ByteString("[") + override val between: ByteString = ByteString(",") + override val end: ByteString = ByteString("]") + } + + /** + * Rendering mode useful when the receiving end expects a valid JSON Array. + * It can be useful when the client wants to detect when the stream has been successfully received in-full, + * which it can determine by seeing the terminating `]` character. + * + * The framing's terminal `]` will ONLY be emitted if the stream has completed successfully, + * in other words - the stream has been emitted completely, without errors occuring before the final element has been signaled. + * + * {{{ + * [{"id":42}, + * {"id":43}, + * {"id":44}] + * }}} + */ + object ArrayLineByLine extends JsonSourceRenderingMode { + override val start: ByteString = ByteString("[") + override val between: ByteString = ByteString(",\n") + override val end: ByteString = ByteString("]") + } + + /** + * Recommended rendering mode. + * + * It is a nice balance between valid and human-readable as well as resonably small size overhead (just the `\n` between elements). + * A good example of API's using this syntax is Twitter's Firehose (last verified at 1.1 version of that API). + * + * {{{ + * {"id":42} + * {"id":43} + * {"id":44} + * }}} + */ + object LineByLine extends JsonSourceRenderingMode { + override val start: ByteString = ByteString.empty + override val between: ByteString = ByteString("\n") + override val end: ByteString = ByteString.empty + } + + /** + * Simple rendering mode interspersing each pair of elements with both `,\n`. + * Picking the [[LineByLine]] format may be preferable, as it is slightly simpler to parse - each line being a valid json object (no need to trim the comma). + * + * {{{ + * {"id":42}, + * {"id":43}, + * {"id":44} + * }}} + */ + object LineByLineCommaSeparated extends JsonSourceRenderingMode { + override val start: ByteString = ByteString.empty + override val between: ByteString = ByteString(",\n") + override val end: ByteString = ByteString.empty + } + +} diff --git a/akka-http/src/main/scala/akka/http/scaladsl/common/SourceRenderingMode.scala b/akka-http/src/main/scala/akka/http/scaladsl/common/SourceRenderingMode.scala new file mode 100644 index 0000000000..61abd85144 --- /dev/null +++ b/akka-http/src/main/scala/akka/http/scaladsl/common/SourceRenderingMode.scala @@ -0,0 +1,11 @@ +/* + * Copyright (C) 2016 Lightbend Inc. + */ + +package akka.http.scaladsl.common + +import akka.http.scaladsl.model.ContentType + +trait SourceRenderingMode extends akka.http.javadsl.common.SourceRenderingMode { + override def contentType: ContentType +} diff --git a/akka-http/src/main/scala/akka/http/scaladsl/common/StrictForm.scala b/akka-http/src/main/scala/akka/http/scaladsl/common/StrictForm.scala index 124746ccd5..615941d25c 100644 --- a/akka-http/src/main/scala/akka/http/scaladsl/common/StrictForm.scala +++ b/akka-http/src/main/scala/akka/http/scaladsl/common/StrictForm.scala @@ -61,8 +61,8 @@ object StrictForm { fsu(value.entity.data.decodeString(charsetName)) }) - @implicitNotFound("In order to unmarshal a `StrictForm.Field` to type `${T}` you need to supply a " + - "`FromStringUnmarshaller[${T}]` and/or a `FromEntityUnmarshaller[${T}]`") + @implicitNotFound(s"In order to unmarshal a `StrictForm.Field` to type `$${T}` you need to supply a " + + s"`FromStringUnmarshaller[$${T}]` and/or a `FromEntityUnmarshaller[$${T}]`") sealed trait FieldUnmarshaller[T] { def unmarshalString(value: String)(implicit ec: ExecutionContext, mat: Materializer): Future[T] def unmarshalPart(value: Multipart.FormData.BodyPart.Strict)(implicit ec: ExecutionContext, mat: Materializer): Future[T] diff --git a/akka-http/src/main/scala/akka/http/scaladsl/marshalling/Marshaller.scala b/akka-http/src/main/scala/akka/http/scaladsl/marshalling/Marshaller.scala index 81c0290448..6606ff9f37 100644 --- a/akka-http/src/main/scala/akka/http/scaladsl/marshalling/Marshaller.scala +++ b/akka-http/src/main/scala/akka/http/scaladsl/marshalling/Marshaller.scala @@ -4,7 +4,9 @@ package akka.http.scaladsl.marshalling -import scala.concurrent.{ Future, ExecutionContext } +import akka.http.scaladsl.marshalling.Marshalling.Opaque + +import scala.concurrent.{ ExecutionContext, Future } import scala.util.control.NonFatal import akka.http.scaladsl.model._ import akka.http.scaladsl.util.FastFuture @@ -174,4 +176,4 @@ object Marshalling { def map[B](f: A ⇒ B): Opaque[B] = copy(marshal = () ⇒ f(marshal())) } } -//# \ No newline at end of file +//# diff --git a/akka-http/src/main/scala/akka/http/scaladsl/server/EntityStreamingSupport.scala b/akka-http/src/main/scala/akka/http/scaladsl/server/EntityStreamingSupport.scala new file mode 100644 index 0000000000..9b6f9bf988 --- /dev/null +++ b/akka-http/src/main/scala/akka/http/scaladsl/server/EntityStreamingSupport.scala @@ -0,0 +1,69 @@ +/* + * Copyright (C) 2009-2015 Typesafe Inc. + */ +package akka.http.scaladsl.server + +import akka.NotUsed +import akka.actor.ActorSystem +import akka.event.Logging +import akka.http.scaladsl.common.{ FramingWithContentType, SourceRenderingMode } +import akka.http.scaladsl.model.{ ContentTypeRange, ContentTypes, MediaRange, MediaRanges } +import akka.stream.scaladsl.{ Flow, Framing } +import akka.util.ByteString +import com.typesafe.config.Config + +/** + * Entity streaming support, independent of used Json parsing library etc. + * + * Can be extended by various Support traits (e.g. "SprayJsonSupport"), + * in order to provide users with both `framing` (this trait) and `marshalling` + * (implemented by a library) by using a single trait. + */ +trait EntityStreamingSupport extends EntityStreamingSupportBase { + + /** + * Implement as `implicit val` with required framing implementation, for example in + * the case of streaming JSON uploads it could be `bracketCountingJsonFraming(maximumObjectLength)`. + */ + def incomingEntityStreamFraming: FramingWithContentType + + /** + * Implement as `implicit val` with the rendering mode to be used when redering `Source` instances. + * For example for JSON it could be [[akka.http.scaladsl.common.JsonSourceRenderingMode.CompactArray]] + * or [[akka.http.scaladsl.common.JsonSourceRenderingMode.LineByLine]]. + */ + def outgoingEntityStreamRendering: SourceRenderingMode +} + +trait EntityStreamingSupportBase { + /** `application/json` specific Framing implementation */ + def bracketCountingJsonFraming(maximumObjectLength: Int): FramingWithContentType = + new ApplicationJsonBracketCountingFraming(maximumObjectLength) + + /** + * Frames incoming `text / *` entities on a line-by-line basis. + * Useful for accepting `text/csv` uploads as a stream of rows. + */ + def newLineFraming(maximumObjectLength: Int, supportedContentTypes: ContentTypeRange): FramingWithContentType = + new FramingWithContentType { + override final val flow: Flow[ByteString, ByteString, NotUsed] = + Flow[ByteString].via(Framing.delimiter(ByteString("\n"), maximumObjectLength)) + + override final val supported: ContentTypeRange = + ContentTypeRange(MediaRanges.`text/*`) + } +} + +/** + * Entity streaming support, independent of used Json parsing library etc. + * + * Can be extended by various Support traits (e.g. "SprayJsonSupport"), + * in order to provide users with both `framing` (this trait) and `marshalling` + * (implemented by a library) by using a single trait. + */ +object EntityStreamingSupport extends EntityStreamingSupportBase + +final class ApplicationJsonBracketCountingFraming(maximumObjectLength: Int) extends FramingWithContentType { + override final val flow = Flow[ByteString].via(akka.stream.scaladsl.JsonFraming.bracketCounting(maximumObjectLength)) + override final val supported = ContentTypeRange(ContentTypes.`application/json`) +} diff --git a/akka-http/src/main/scala/akka/http/scaladsl/server/JsonEntityStreaming.scala b/akka-http/src/main/scala/akka/http/scaladsl/server/JsonEntityStreaming.scala deleted file mode 100644 index 228aaf96a8..0000000000 --- a/akka-http/src/main/scala/akka/http/scaladsl/server/JsonEntityStreaming.scala +++ /dev/null @@ -1,168 +0,0 @@ -/* - * Copyright (C) 2009-2015 Typesafe Inc. - */ -package akka.http.scaladsl.server - -import akka.NotUsed -import akka.actor.ActorSystem -import akka.http.impl.util.JavaMapping -import akka.http.scaladsl.model.{ ContentType, ContentTypes } -import akka.http.scaladsl.server.directives.FramedEntityStreamingDirectives.SourceRenderingMode -import akka.stream.scaladsl.{ Flow, Framing } -import akka.util.ByteString -import com.typesafe.config.Config - -import scala.collection.immutable - -/** - * Same as [[akka.stream.scaladsl.Framing]] but additionally can express which [[ContentType]] it supports, - * which can be used to reject routes if content type does not match used framing. - */ -abstract class FramingWithContentType extends Framing { - def flow: Flow[ByteString, ByteString, NotUsed] - def supported: immutable.Set[ContentType] - def isSupported(ct: akka.http.javadsl.model.ContentType): Boolean = supported(JavaMapping.ContentType.toScala(ct)) -} -object FramingWithContentType { - def apply(framing: Flow[ByteString, ByteString, NotUsed], contentType: ContentType, moreContentTypes: ContentType*) = - new FramingWithContentType { - override def flow = framing - - override val supported: immutable.Set[ContentType] = - if (moreContentTypes.isEmpty) Set(contentType) - else Set(contentType) ++ moreContentTypes - } -} - -/** - * Json entity streaming support, independent of used Json parsing library. - * - * Can be extended by various Support traits (e.g. "SprayJsonSupport"), - * in order to provide users with both `framing` (this trait) and `marshalling` - * (implemented by a library) by using a single trait. - */ -trait JsonEntityFramingSupport { - - /** `application/json` specific Framing implementation */ - def bracketCountingJsonFraming(maximumObjectLength: Int) = new FramingWithContentType { - override final val flow = Flow[ByteString].via(akka.stream.scaladsl.JsonFraming.bracketCounting(maximumObjectLength)) - - override val supported: immutable.Set[ContentType] = Set(ContentTypes.`application/json`) - } -} -object JsonEntityFramingSupport extends JsonEntityFramingSupport - -/** - * Specialised rendering mode for streaming elements as JSON. - * - * See also: JSON Streaming on Wikipedia. - */ -trait JsonSourceRenderingMode extends SourceRenderingMode { - override val contentType = ContentTypes.`application/json` -} - -object JsonSourceRenderingMode { - - /** - * Most compact rendering mode - * It does not intersperse any separator between the signalled elements. - * - * {{{ - * {"id":42}{"id":43}{"id":44} - * }}} - */ - object Compact extends JsonSourceRenderingMode { - override val start: ByteString = ByteString.empty - override val between: ByteString = ByteString.empty - override val end: ByteString = ByteString.empty - } - - /** - * Simple rendering mode, similar to [[Compact]] however interspersing elements with a `\n` character. - * - * {{{ - * {"id":42},{"id":43},{"id":44} - * }}} - */ - object CompactCommaSeparated extends JsonSourceRenderingMode { - override val start: ByteString = ByteString.empty - override val between: ByteString = ByteString(",") - override val end: ByteString = ByteString.empty - } - - /** - * Rendering mode useful when the receiving end expects a valid JSON Array. - * It can be useful when the client wants to detect when the stream has been successfully received in-full, - * which it can determine by seeing the terminating `]` character. - * - * The framing's terminal `]` will ONLY be emitted if the stream has completed successfully, - * in other words - the stream has been emitted completely, without errors occuring before the final element has been signaled. - * - * {{{ - * [{"id":42},{"id":43},{"id":44}] - * }}} - */ - object CompactArray extends JsonSourceRenderingMode { - override val start: ByteString = ByteString("[") - override val between: ByteString = ByteString(",") - override val end: ByteString = ByteString("]") - } - - /** - * Recommended rendering mode. - * - * It is a nice balance between valid and human-readable as well as resonably small size overhead (just the `\n` between elements). - * A good example of API's using this syntax is Twitter's Firehose (last verified at 1.1 version of that API). - * - * {{{ - * {"id":42} - * {"id":43} - * {"id":44} - * }}} - */ - object LineByLine extends JsonSourceRenderingMode { - override val start: ByteString = ByteString.empty - override val between: ByteString = ByteString("\n") - override val end: ByteString = ByteString.empty - } - - /** - * Simple rendering mode interspersing each pair of elements with both `,\n`. - * Picking the [[LineByLine]] format may be preferable, as it is slightly simpler to parse - each line being a valid json object (no need to trim the comma). - * - * {{{ - * {"id":42}, - * {"id":43}, - * {"id":44} - * }}} - */ - object LineByLineCommaSeparated extends JsonSourceRenderingMode { - override val start: ByteString = ByteString.empty - override val between: ByteString = ByteString(",\n") - override val end: ByteString = ByteString.empty - } - -} - -object JsonStreamingSettings { - - def apply(sys: ActorSystem): JsonStreamingSettings = - apply(sys.settings.config.getConfig("akka.http.json-streaming")) - - def apply(c: Config): JsonStreamingSettings = { - JsonStreamingSettings( - c.getInt("max-object-size"), - renderingMode(c.getString("rendering-mode"))) - } - - def renderingMode(name: String): SourceRenderingMode = name match { - case "line-by-line" ⇒ JsonSourceRenderingMode.LineByLine // the default - case "line-by-line-comma-separated" ⇒ JsonSourceRenderingMode.LineByLineCommaSeparated - case "compact" ⇒ JsonSourceRenderingMode.Compact - case "compact-comma-separated" ⇒ JsonSourceRenderingMode.CompactCommaSeparated - case "compact-array" ⇒ JsonSourceRenderingMode.CompactArray - } -} -final case class JsonStreamingSettings( - maxObjectSize: Int, - style: SourceRenderingMode) diff --git a/akka-http/src/main/scala/akka/http/scaladsl/server/directives/FramedEntityStreamingDirectives.scala b/akka-http/src/main/scala/akka/http/scaladsl/server/directives/FramedEntityStreamingDirectives.scala index e9a6996a96..c93c6e4020 100644 --- a/akka-http/src/main/scala/akka/http/scaladsl/server/directives/FramedEntityStreamingDirectives.scala +++ b/akka-http/src/main/scala/akka/http/scaladsl/server/directives/FramedEntityStreamingDirectives.scala @@ -4,9 +4,9 @@ package akka.http.scaladsl.server.directives import akka.NotUsed +import akka.http.scaladsl.common.{ FramingWithContentType, SourceRenderingMode } import akka.http.scaladsl.marshalling._ import akka.http.scaladsl.model._ -import akka.http.scaladsl.server.FramingWithContentType import akka.http.scaladsl.unmarshalling.{ Unmarshal, Unmarshaller, _ } import akka.http.scaladsl.util.FastFuture import akka.stream.Materializer @@ -28,33 +28,31 @@ trait FramedEntityStreamingDirectives extends MarshallingDirectives { // TODO DOCS - final def stream[T](implicit um: Unmarshaller[ByteString, T], framing: FramingWithContentType): RequestToSourceUnmarshaller[T] = - streamAsync(1)(um, framing) - final def stream[T](framing: FramingWithContentType)(implicit um: Unmarshaller[ByteString, T]): RequestToSourceUnmarshaller[T] = - streamAsync(1)(um, framing) + final def asSourceOf[T](implicit um: Unmarshaller[HttpEntity, T], framing: FramingWithContentType): RequestToSourceUnmarshaller[T] = + asSourceOfAsync(1)(um, framing) + final def asSourceOf[T](framing: FramingWithContentType)(implicit um: Unmarshaller[HttpEntity, T]): RequestToSourceUnmarshaller[T] = + asSourceOfAsync(1)(um, framing) - final def streamAsync[T](parallelism: Int)(implicit um: Unmarshaller[ByteString, T], framing: FramingWithContentType): RequestToSourceUnmarshaller[T] = - streamInternal[T](framing, (ec, mat) ⇒ Flow[ByteString].mapAsync(parallelism)(Unmarshal(_).to[T](um, ec, mat))) - final def streamAsync[T](parallelism: Int, framing: FramingWithContentType)(implicit um: Unmarshaller[ByteString, T]): RequestToSourceUnmarshaller[T] = - streamAsync(parallelism)(um, framing) + final def asSourceOfAsync[T](parallelism: Int)(implicit um: Unmarshaller[HttpEntity, T], framing: FramingWithContentType): RequestToSourceUnmarshaller[T] = + asSourceOfInternal[T](framing, (ec, mat) ⇒ Flow[HttpEntity].mapAsync(parallelism)(Unmarshal(_).to[T](um, ec, mat))) + final def asSourceOfAsync[T](parallelism: Int, framing: FramingWithContentType)(implicit um: Unmarshaller[HttpEntity, T]): RequestToSourceUnmarshaller[T] = + asSourceOfAsync(parallelism)(um, framing) - final def streamAsyncUnordered[T](parallelism: Int)(implicit um: Unmarshaller[ByteString, T], framing: FramingWithContentType): RequestToSourceUnmarshaller[T] = - streamInternal[T](framing, (ec, mat) ⇒ Flow[ByteString].mapAsyncUnordered(parallelism)(Unmarshal(_).to[T](um, ec, mat))) - final def streamAsyncUnordered[T](parallelism: Int, framing: FramingWithContentType)(implicit um: Unmarshaller[ByteString, T]): RequestToSourceUnmarshaller[T] = - streamAsyncUnordered(parallelism)(um, framing) - - // TODO materialized value may want to be "drain/cancel" or something like it? - // TODO could expose `streamMat`, for more fine grained picking of Marshaller + final def asSourceOfAsyncUnordered[T](parallelism: Int)(implicit um: Unmarshaller[HttpEntity, T], framing: FramingWithContentType): RequestToSourceUnmarshaller[T] = + asSourceOfInternal[T](framing, (ec, mat) ⇒ Flow[HttpEntity].mapAsyncUnordered(parallelism)(Unmarshal(_).to[T](um, ec, mat))) + final def asSourceOfAsyncUnordered[T](parallelism: Int, framing: FramingWithContentType)(implicit um: Unmarshaller[HttpEntity, T]): RequestToSourceUnmarshaller[T] = + asSourceOfAsyncUnordered(parallelism)(um, framing) // format: OFF - private def streamInternal[T](framing: FramingWithContentType, marshalling: (ExecutionContext, Materializer) => Flow[ByteString, ByteString, NotUsed]#ReprMat[T, NotUsed]): RequestToSourceUnmarshaller[T] = + private def asSourceOfInternal[T](framing: FramingWithContentType, marshalling: (ExecutionContext, Materializer) => Flow[HttpEntity, ByteString, NotUsed]#ReprMat[T, NotUsed]): RequestToSourceUnmarshaller[T] = Unmarshaller.withMaterializer[HttpRequest, Source[T, NotUsed]] { implicit ec ⇒ implicit mat ⇒ req ⇒ val entity = req.entity - if (!framing.supported(entity.contentType)) { - val supportedContentTypes = framing.supported.map(ContentTypeRange(_)) + if (!framing.matches(entity.contentType)) { + val supportedContentTypes = framing.supported FastFuture.failed(Unmarshaller.UnsupportedContentTypeException(supportedContentTypes)) } else { - val stream = entity.dataBytes.via(framing.flow).via(marshalling(ec, mat)).mapMaterializedValue(_ => NotUsed) +// val stream = entity.dataBytes.via(framing.flow).via(marshalling(ec, mat)).mapMaterializedValue(_ => NotUsed) + val stream = Source.single(entity.transformDataBytes(framing.flow)).via(marshalling(ec, mat)).mapMaterializedValue(_ => NotUsed) FastFuture.successful(stream) } } @@ -65,7 +63,7 @@ trait FramedEntityStreamingDirectives extends MarshallingDirectives { implicit def _sourceMarshaller[T, M](implicit m: ToEntityMarshaller[T], mode: SourceRenderingMode): ToResponseMarshaller[Source[T, M]] = Marshaller[Source[T, M], HttpResponse] { implicit ec ⇒ source ⇒ FastFuture successful { - Marshalling.WithFixedContentType(mode.contentType, () ⇒ { // TODO charset? + Marshalling.WithFixedContentType(mode.contentType, () ⇒ { val bytes = source .mapAsync(1)(t ⇒ Marshal(t).to[HttpEntity]) .map(_.dataBytes) @@ -79,7 +77,7 @@ trait FramedEntityStreamingDirectives extends MarshallingDirectives { implicit def _sourceParallelismMarshaller[T](implicit m: ToEntityMarshaller[T], mode: SourceRenderingMode): ToResponseMarshaller[AsyncRenderingOf[T]] = Marshaller[AsyncRenderingOf[T], HttpResponse] { implicit ec ⇒ rendering ⇒ FastFuture successful { - Marshalling.WithFixedContentType(mode.contentType, () ⇒ { // TODO charset? + Marshalling.WithFixedContentType(mode.contentType, () ⇒ { val bytes = rendering.source .mapAsync(rendering.parallelism)(t ⇒ Marshal(t).to[HttpEntity]) .map(_.dataBytes) @@ -93,7 +91,7 @@ trait FramedEntityStreamingDirectives extends MarshallingDirectives { implicit def _sourceUnorderedMarshaller[T](implicit m: ToEntityMarshaller[T], mode: SourceRenderingMode): ToResponseMarshaller[AsyncUnorderedRenderingOf[T]] = Marshaller[AsyncUnorderedRenderingOf[T], HttpResponse] { implicit ec ⇒ rendering ⇒ FastFuture successful { - Marshalling.WithFixedContentType(mode.contentType, () ⇒ { // TODO charset? + Marshalling.WithFixedContentType(mode.contentType, () ⇒ { val bytes = rendering.source .mapAsync(rendering.parallelism)(t ⇒ Marshal(t).to[HttpEntity]) .map(_.dataBytes) @@ -106,32 +104,34 @@ trait FramedEntityStreamingDirectives extends MarshallingDirectives { // special rendering modes - implicit def enableSpecialSourceRenderingModes[T](source: Source[T, Any]): EnableSpecialSourceRenderingModes[T] = + implicit def _enableSpecialSourceRenderingModes[T](source: Source[T, Any]): EnableSpecialSourceRenderingModes[T] = new EnableSpecialSourceRenderingModes(source) } object FramedEntityStreamingDirectives extends FramedEntityStreamingDirectives { - /** - * Defines ByteStrings to be injected before the first, between, and after all elements of a [[Source]], - * when used to complete a request. - * - * A typical example would be rendering a ``Source[T, _]`` as JSON array, - * where start is `[`, between is `,`, and end is `]` - which procudes a valid json array, assuming each element can - * be properly marshalled as JSON object. - * - * The corresponding values will typically be put into an [[Source.intersperse]] call on the to-be-rendered Source. - */ - trait SourceRenderingMode extends akka.http.javadsl.server.directives.FramedEntityStreamingDirectives.SourceRenderingMode { - override final def getContentType = contentType - def contentType: ContentType - } - - final class AsyncRenderingOf[T](val source: Source[T, Any], val parallelism: Int) - final class AsyncUnorderedRenderingOf[T](val source: Source[T, Any], val parallelism: Int) + sealed class AsyncSourceRenderingMode + final class AsyncRenderingOf[T](val source: Source[T, Any], val parallelism: Int) extends AsyncSourceRenderingMode + final class AsyncUnorderedRenderingOf[T](val source: Source[T, Any], val parallelism: Int) extends AsyncSourceRenderingMode } final class EnableSpecialSourceRenderingModes[T](val source: Source[T, Any]) extends AnyVal { + /** + * Causes the response stream to be marshalled asynchronously (up to `parallelism` elements at once), + * while retaining the ordering of incoming elements. + * + * See also [[Source.mapAsync]]. + */ def renderAsync(parallelism: Int) = new FramedEntityStreamingDirectives.AsyncRenderingOf(source, parallelism) + /** + * Causes the response stream to be marshalled asynchronously (up to `parallelism` elements at once), + * emitting the first one that finished marshalling onto the wire. + * + * This sacrifices ordering of the incoming data in regards to data actually rendered onto the wire, + * but may be faster if some elements are smaller than other ones by not stalling the small elements + * from being written while the large one still is being marshalled. + * + * See also [[Source.mapAsyncUnordered]]. + */ def renderAsyncUnordered(parallelism: Int) = new FramedEntityStreamingDirectives.AsyncUnorderedRenderingOf(source, parallelism) } diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Framing.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Framing.scala index b7b21030b1..6ae30d8564 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Framing.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Framing.scala @@ -115,3 +115,18 @@ object Framing { scaladsl.Framing.simpleFramingProtocol(maximumMessageLength).asJava } + +/** + * Wrapper around a framing Flow (as provided by [[Framing.delimiter]] for example. + * Used for providing a framing implicitly for other components which may need one (such as framed entity streaming in Akka HTTP). + */ +trait Framing { + def asScala: akka.stream.scaladsl.Framing = + this match { + case f: akka.stream.scaladsl.Framing ⇒ f + case _ ⇒ new akka.stream.scaladsl.Framing { + override def flow = getFlow.asScala + } + } + def getFlow: Flow[ByteString, ByteString, NotUsed] +} diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Framing.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Framing.scala index d68b19560e..4dadca9756 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Framing.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Framing.scala @@ -292,6 +292,9 @@ object Framing { * Wrapper around a framing Flow (as provided by [[Framing.delimiter]] for example. * Used for providing a framing implicitly for other components which may need one (such as framed entity streaming in Akka HTTP). */ -trait Framing { +trait Framing extends akka.stream.javadsl.Framing { + final def asJava: akka.stream.javadsl.Framing = this + override final def getFlow = flow.asJava + def flow: Flow[ByteString, ByteString, NotUsed] }