+htp #18837 implemented JavaDSL for Source streaming

This commit is contained in:
Konrad Malawski 2016-07-25 01:50:55 +02:00
parent 24454f7f09
commit db880a3db0
26 changed files with 708 additions and 276 deletions

View file

@ -105,7 +105,7 @@ class JsonStreamingExamplesSpec extends RoutingSpec {
val route =
path("metrics") {
// [4] extract Source[Measurement, _]
entity(stream[Measurement]) { measurements =>
entity(asSourceOf[Measurement]) { measurements =>
println("measurements = " + measurements)
val measurementsSubmitted: Future[Int] =
measurements

View file

@ -25,6 +25,9 @@ public final class ContentTypes {
public static final ContentType.WithCharset TEXT_XML_UTF8 =
akka.http.scaladsl.model.ContentTypes.text$divxml$u0028UTF$minus8$u0029();
public static final ContentType.WithCharset TEXT_CSV_UTF8 =
akka.http.scaladsl.model.ContentTypes.text$divcsv$u0028UTF$minus8$u0029();
public static ContentType.Binary create(MediaType.Binary mediaType) {
return ContentType$.MODULE$.apply((akka.http.scaladsl.model.MediaType.Binary) mediaType);
}

View file

@ -3,15 +3,20 @@
*/
package akka.http.javadsl.server;
import akka.NotUsed;
import akka.actor.ActorSystem;
import akka.http.javadsl.ConnectHttp;
import akka.http.javadsl.Http;
import akka.http.javadsl.ServerBinding;
import akka.http.javadsl.marshallers.jackson.Jackson;
import akka.http.javadsl.model.HttpEntity;
import akka.http.javadsl.model.HttpRequest;
import akka.http.javadsl.model.HttpResponse;
import akka.http.javadsl.model.StatusCodes;
import akka.http.javadsl.common.JsonSourceRenderingModes;
import akka.stream.ActorMaterializer;
import akka.stream.javadsl.Flow;
import akka.stream.javadsl.Source;
import scala.concurrent.duration.Duration;
import scala.runtime.BoxedUnit;
@ -55,8 +60,26 @@ public class JavaTestServer extends AllDirectives { // or import static Directiv
);
final Route crash = path("crash", () ->
path("scala", () -> completeOKWithFutureString(akka.dispatch.Futures.<String>failed(new Exception("Boom!")))).orElse(
path("java", () -> completeOKWithFutureString(CompletableFuture.<String>supplyAsync(() -> { throw new RuntimeException("Boom!"); }))))
path("scala", () -> completeOKWithFutureString(akka.dispatch.Futures.failed(new Exception("Boom!")))).orElse(
path("java", () -> completeOKWithFutureString(CompletableFuture.supplyAsync(() -> { throw new RuntimeException("Boom!"); }))))
);
final Unmarshaller<HttpEntity, JavaTweet> JavaTweets = Jackson.unmarshaller(JavaTweet.class);
final Route tweets = path("tweets", () ->
get(() ->
parameter(StringUnmarshallers.INTEGER, "n", n -> {
final Source<JavaTweet, NotUsed> tws = Source.repeat(new JavaTweet("Hello World!")).take(n);
return completeOKWithSource(tws, Jackson.marshaller(), JsonSourceRenderingModes.arrayCompact());
})
).orElse(
post(() ->
extractMaterializer(mat ->
entityasSourceOf(JavaTweets, null, sourceOfTweets -> {
final CompletionStage<Integer> tweetsCount = sourceOfTweets.runFold(0, (acc, tweet) -> acc + 1, mat);
return onComplete(tweetsCount, c -> complete("Total number of tweets: " + c));
})
)
))
);
final Route inner = path("inner", () ->
@ -64,9 +87,14 @@ public class JavaTestServer extends AllDirectives { // or import static Directiv
);
return get(() ->
index.orElse(secure).orElse(ping).orElse(crash).orElse(inner).orElse(requestTimeout)
);
return index
.orElse(secure)
.orElse(ping)
.orElse(crash)
.orElse(inner)
.orElse(requestTimeout)
.orElse(tweets)
;
}
private void silentSleep(int millis) {
@ -113,7 +141,7 @@ public class JavaTestServer extends AllDirectives { // or import static Directiv
final Flow<HttpRequest, HttpResponse, ?> flow = createRoute().flow(system, mat);
final CompletionStage<ServerBinding> binding =
Http.get(system).bindAndHandle(flow, ConnectHttp.toHost("127.0.0.1"), mat);
Http.get(system).bindAndHandle(flow, ConnectHttp.toHost("127.0.0.1", 8080), mat);
System.console().readLine("Press [ENTER] to quit...");
shutdown(binding);
@ -131,4 +159,21 @@ public class JavaTestServer extends AllDirectives { // or import static Directiv
}
});
}
private static final class JavaTweet {
private String message;
public JavaTweet(String message) {
this.message = message;
}
public void setMessage(String message) {
this.message = message;
}
public String getMessage() {
return message;
}
}
}

View file

@ -18,6 +18,7 @@ import scala.concurrent.Await
private[akka] trait IntegrationRoutingSpec extends WordSpecLike with Matchers with BeforeAndAfterAll
with Directives with RequestBuilding
with ScalaFutures with IntegrationPatience {
import IntegrationRoutingSpec._
implicit val system = ActorSystem(AkkaSpec.getCallerName(getClass))
implicit val mat = ActorMaterializer()
@ -31,8 +32,6 @@ private[akka] trait IntegrationRoutingSpec extends WordSpecLike with Matchers wi
def ~!>(route: Route) = new Prepped(request, route)
}
final case class Prepped(request: HttpRequest, route: Route)
implicit class Checking(p: Prepped) {
def ~!>(checking: HttpResponse Unit) = {
val (_, host, port) = TestUtils.temporaryServerHostnameAndPort()
@ -47,3 +46,7 @@ private[akka] trait IntegrationRoutingSpec extends WordSpecLike with Matchers wi
}
}
object IntegrationRoutingSpec {
final case class Prepped(request: HttpRequest, route: Route)
}

View file

@ -4,14 +4,18 @@
package akka.http.scaladsl.server
import akka.NotUsed
import akka.http.scaladsl.marshallers.xml.ScalaXmlSupport
import akka.http.scaladsl.model.{ StatusCodes, HttpResponse }
import akka.http.scaladsl.model.{ HttpResponse, StatusCodes }
import akka.http.scaladsl.server.directives.Credentials
import com.typesafe.config.{ ConfigFactory, Config }
import com.typesafe.config.{ Config, ConfigFactory }
import akka.actor.ActorSystem
import akka.stream._
import akka.stream.scaladsl._
import akka.http.scaladsl.Http
import akka.http.scaladsl.common.{ FramingWithContentType, JsonSourceRenderingModes, SourceRenderingMode }
import akka.http.scaladsl.marshalling.ToResponseMarshallable
import scala.concurrent.duration._
import scala.io.StdIn
@ -21,10 +25,26 @@ object TestServer extends App {
akka.log-dead-letters = off
akka.stream.materializer.debug.fuzzing-mode = off
""")
implicit val system = ActorSystem("ServerTest", testConf)
import system.dispatcher
implicit val materializer = ActorMaterializer()
// --------- json streaming ---------
import spray.json.DefaultJsonProtocol._
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._
final case class Tweet(message: String)
implicit val tweetFormat = jsonFormat1(Tweet)
// FIXME: Need to be able to support composive framing with content type (!!!!!!!)
import akka.http.scaladsl.server.EntityStreamingSupport._
/* override if extending EntityStreamingSupport */
implicit val incomingEntityStreamFraming: FramingWithContentType = bracketCountingJsonFraming(128)
/* override if extending EntityStreamingSupport */
implicit val outgoingEntityStreamRendering: SourceRenderingMode = JsonSourceRenderingModes.LineByLine
// --------- end of json streaming ---------
import ScalaXmlSupport._
import Directives._
@ -32,7 +52,8 @@ object TestServer extends App {
case p @ Credentials.Provided(name) if p.verify(name + "-password") name
}
val bindingFuture = Http().bindAndHandle({
// format: OFF
val routes = {
get {
path("") {
withRequestTimeout(1.milli, _ HttpResponse(
@ -44,7 +65,13 @@ object TestServer extends App {
} ~
path("secure") {
authenticateBasicPF("My very secure site", auth) { user
complete(<html><body>Hello <b>{ user }</b>. Access has been granted!</body></html>)
complete(<html>
<body>Hello
<b>
{user}
</b>
. Access has been granted!</body>
</html>)
}
} ~
path("ping") {
@ -52,9 +79,28 @@ object TestServer extends App {
} ~
path("crash") {
complete(sys.error("BOOM!"))
} ~
path("tweet") {
complete(Tweet("Hello, world!"))
} ~
(path("tweets") & parameter('n.as[Int])) { n =>
get {
val tweets = Source.repeat(Tweet("Hello, world!")).take(n)
complete(ToResponseMarshallable(tweets))
} ~
post {
entity(asSourceOf[Tweet]) { tweets
// entity(asSourceOf[Tweet](bracketCountingJsonFraming(1024))) { tweets: Source[Tweet, NotUsed]
complete(s"Total tweets received: " + tweets.runFold(0)({ case (acc, t) => acc + 1 }))
}
} ~ pathPrefix("inner")(getFromResourceDirectory("someDir"))
}, interface = "localhost", port = 8080)
}
}
} ~
pathPrefix("inner")(getFromResourceDirectory("someDir"))
}
// format: ON
val bindingFuture = Http().bindAndHandle(routes, interface = "0.0.0.0", port = 8080)
println(s"Server online at http://localhost:8080/\nPress RETURN to stop...")
StdIn.readLine()

View file

@ -0,0 +1,41 @@
/*
* Copyright (C) 2016 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.http.javadsl.common
import akka.http.javadsl.model.ContentType.WithCharset
import akka.http.javadsl.model.ContentTypes
import akka.util.ByteString
/**
* Specialised rendering mode for streaming elements as CSV.
*/
trait CsvSourceRenderingMode extends SourceRenderingMode {
override val contentType: WithCharset =
ContentTypes.TEXT_CSV_UTF8
}
object CsvSourceRenderingModes {
/**
* Render sequence of values as row-by-row ('\n' separated) series of values.
*/
val create: CsvSourceRenderingMode =
new CsvSourceRenderingMode {
override def between: ByteString = ByteString("\n")
override def end: ByteString = ByteString.empty
override def start: ByteString = ByteString.empty
}
/**
* Render sequence of values as row-by-row (with custom row separator,
* e.g. if you need to use '\r\n' instead of '\n') series of values.
*/
def custom(rowSeparator: String): CsvSourceRenderingMode =
new CsvSourceRenderingMode {
override def between: ByteString = ByteString(rowSeparator)
override def end: ByteString = ByteString.empty
override def start: ByteString = ByteString.empty
}
}

View file

@ -0,0 +1,24 @@
/*
* Copyright (C) 2016 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.http.javadsl.common
import akka.http.javadsl.model.ContentTypeRange
import akka.stream.javadsl.Framing
trait FramingWithContentType extends Framing { self
import akka.http.impl.util.JavaMapping.Implicits._
override def asScala: akka.http.scaladsl.common.FramingWithContentType =
this match {
case f: akka.http.scaladsl.common.FramingWithContentType f
case _ new akka.http.scaladsl.common.FramingWithContentType {
override def flow = self.getFlow.asScala
override def supported = self.supported.asScala
}
}
def supported: ContentTypeRange
def matches(ct: akka.http.javadsl.model.ContentType): Boolean = supported.matches(ct)
}

View file

@ -0,0 +1,101 @@
/*
* Copyright (C) 2016 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.http.javadsl.common
import akka.http.javadsl.model.{ ContentType, ContentTypes }
/**
* Specialised rendering mode for streaming elements as JSON.
*
* See also: <a href="https://en.wikipedia.org/wiki/JSON_Streaming">JSON Streaming on Wikipedia</a>.
*/
trait JsonSourceRenderingMode extends SourceRenderingMode {
override val contentType: ContentType.WithFixedCharset =
ContentTypes.APPLICATION_JSON
}
/**
* Provides default JSON rendering modes.
*/
object JsonSourceRenderingModes {
/**
* Most compact rendering mode.
* It does not intersperse any separator between the signalled elements.
*
* It can be used with [[akka.stream.javadsl.JsonFraming.bracketCounting]].
*
* {{{
* {"id":42}{"id":43}{"id":44}
* }}}
*/
val compact = akka.http.scaladsl.common.JsonSourceRenderingModes.Compact
/**
* Simple rendering mode, similar to [[compact]] however interspersing elements with a `\n` character.
*
* {{{
* {"id":42},{"id":43},{"id":44}
* }}}
*/
val compactCommaSeparated = akka.http.scaladsl.common.JsonSourceRenderingModes.CompactCommaSeparated
/**
* Rendering mode useful when the receiving end expects a valid JSON Array.
* It can be useful when the client wants to detect when the stream has been successfully received in-full,
* which it can determine by seeing the terminating `]` character.
*
* The framing's terminal `]` will ONLY be emitted if the stream has completed successfully,
* in other words - the stream has been emitted completely, without errors occuring before the final element has been signaled.
*
* {{{
* [{"id":42},{"id":43},{"id":44}]
* }}}
*/
val arrayCompact = akka.http.scaladsl.common.JsonSourceRenderingModes.ArrayCompact
/**
* Rendering mode useful when the receiving end expects a valid JSON Array.
* It can be useful when the client wants to detect when the stream has been successfully received in-full,
* which it can determine by seeing the terminating `]` character.
*
* The framing's terminal `]` will ONLY be emitted if the stream has completed successfully,
* in other words - the stream has been emitted completely, without errors occuring before the final element has been signaled.
*
* {{{
* [{"id":42},
* {"id":43},
* {"id":44}]
* }}}
*/
val arrayLineByLine = akka.http.scaladsl.common.JsonSourceRenderingModes.ArrayLineByLine
/**
* Recommended rendering mode.
*
* It is a nice balance between valid and human-readable as well as resonably small size overhead (just the `\n` between elements).
* A good example of API's using this syntax is Twitter's Firehose (last verified at 1.1 version of that API).
*
* {{{
* {"id":42}
* {"id":43}
* {"id":44}
* }}}
*/
val lineByLine = akka.http.scaladsl.common.JsonSourceRenderingModes.LineByLine
/**
* Simple rendering mode interspersing each pair of elements with both `,\n`.
* Picking the [[lineByLine]] format may be preferable, as it is slightly simpler to parse - each line being a valid json object (no need to trim the comma).
*
* {{{
* {"id":42},
* {"id":43},
* {"id":44}
* }}}
*/
val lineByLineCommaSeparated = akka.http.scaladsl.common.JsonSourceRenderingModes.LineByLineCommaSeparated
}

View file

@ -0,0 +1,22 @@
/*
* Copyright (C) 2016 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.http.javadsl.common
import akka.http.javadsl.model.ContentType
import akka.util.ByteString
/**
* Defines how to render a [[akka.stream.javadsl.Source]] into a raw [[ByteString]]
* output.
*
* This can be used to render a source into an [[akka.http.scaladsl.model.HttpEntity]].
*/
trait SourceRenderingMode {
def contentType: ContentType
def start: ByteString
def between: ByteString
def end: ByteString
}

View file

@ -16,6 +16,7 @@ import akka.japi.Util
import akka.util.ByteString
import scala.concurrent.ExecutionContext
import scala.annotation.unchecked.uncheckedVariance
import scala.language.implicitConversions
object Marshaller {
@ -56,29 +57,29 @@ object Marshaller {
// TODO make sure these are actually usable in a sane way
def wrapEntity[A, C](f: function.BiFunction[ExecutionContext, C, A], m: Marshaller[A, RequestEntity], mediaType: MediaType): Marshaller[C, RequestEntity] = {
val scalaMarshaller = m.asScalaToEntityMarshaller
val scalaMarshaller = m.asScalaCastOutput
fromScala(scalaMarshaller.wrapWithEC(mediaType.asScala) { ctx c: C f(ctx, c) }(ContentTypeOverrider.forEntity))
}
def wrapEntity[A, C, E <: RequestEntity](f: function.Function[C, A], m: Marshaller[A, E], mediaType: MediaType): Marshaller[C, RequestEntity] = {
val scalaMarshaller = m.asScalaToEntityMarshaller
val scalaMarshaller = m.asScalaCastOutput
fromScala(scalaMarshaller.wrap(mediaType.asScala)((in: C) f.apply(in))(ContentTypeOverrider.forEntity))
}
def entityToOKResponse[A](m: Marshaller[A, _ <: RequestEntity]): Marshaller[A, HttpResponse] = {
fromScala(marshalling.Marshaller.fromToEntityMarshaller[A]()(m.asScalaToEntityMarshaller))
fromScala(marshalling.Marshaller.fromToEntityMarshaller[A]()(m.asScalaCastOutput))
}
def entityToResponse[A, R <: RequestEntity](status: StatusCode, m: Marshaller[A, R]): Marshaller[A, HttpResponse] = {
fromScala(marshalling.Marshaller.fromToEntityMarshaller[A](status.asScala)(m.asScalaToEntityMarshaller))
fromScala(marshalling.Marshaller.fromToEntityMarshaller[A](status.asScala)(m.asScalaCastOutput))
}
def entityToResponse[A](status: StatusCode, headers: java.lang.Iterable[HttpHeader], m: Marshaller[A, _ <: RequestEntity]): Marshaller[A, HttpResponse] = {
fromScala(marshalling.Marshaller.fromToEntityMarshaller[A](status.asScala, Util.immutableSeq(headers).map(_.asScala))(m.asScalaToEntityMarshaller)) // TODO can we avoid the map() ?
fromScala(marshalling.Marshaller.fromToEntityMarshaller[A](status.asScala, Util.immutableSeq(headers).map(_.asScala))(m.asScalaCastOutput)) // TODO can we avoid the map() ?
}
def entityToOKResponse[A](headers: java.lang.Iterable[HttpHeader], m: Marshaller[A, _ <: RequestEntity]): Marshaller[A, HttpResponse] = {
fromScala(marshalling.Marshaller.fromToEntityMarshaller[A](headers = Util.immutableSeq(headers).map(_.asScala))(m.asScalaToEntityMarshaller)) // TODO avoid the map()
fromScala(marshalling.Marshaller.fromToEntityMarshaller[A](headers = Util.immutableSeq(headers).map(_.asScala))(m.asScalaCastOutput)) // TODO avoid the map()
}
// these are methods not varargs to avoid call site warning about unchecked type params
@ -140,13 +141,14 @@ object Marshaller {
m.asScala.map(_.asScala)
}
class Marshaller[A, B] private (implicit val asScala: marshalling.Marshaller[A, B]) {
class Marshaller[-A, +B] private (implicit val asScala: marshalling.Marshaller[A, B]) {
import Marshaller.fromScala
/** INTERNAL API: involves unsafe cast (however is very fast) */
// TODO would be nice to not need this special case
def asScalaToEntityMarshaller[C]: marshalling.Marshaller[A, C] = asScala.asInstanceOf[marshalling.Marshaller[A, C]]
private[akka] def asScalaCastOutput[C]: marshalling.Marshaller[A, C] = asScala.asInstanceOf[marshalling.Marshaller[A, C]]
def map[C](f: function.Function[B, C]): Marshaller[A, C] = fromScala(asScala.map(f.apply))
def map[C](f: function.Function[B @uncheckedVariance, C]): Marshaller[A, C] = fromScala(asScala.map(f.apply))
def compose[C](f: function.Function[C, A]): Marshaller[C, B] = fromScala(asScala.compose(f.apply))
def compose[C](f: function.Function[C, A @uncheckedVariance]): Marshaller[C, B] = fromScala(asScala.compose(f.apply))
}

View file

@ -5,11 +5,11 @@
package akka.http.javadsl.server
import akka.http.impl.util.JavaMapping
import akka.http.javadsl.server.directives.TimeoutDirectives
import akka.http.javadsl.server.directives.{ FramedEntityStreamingDirectives, TimeoutDirectives }
import scala.annotation.varargs
abstract class AllDirectives extends TimeoutDirectives
abstract class AllDirectives extends FramedEntityStreamingDirectives
/**
* INTERNAL API

View file

@ -0,0 +1,9 @@
/*
* Copyright (C) 2009-2015 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.http.javadsl.server
class JsonEntityStreaming {
}

View file

@ -10,7 +10,9 @@ import akka.http.impl.util.JavaMapping._
import akka.http.impl.util._
import akka.http.{ javadsl, scaladsl }
import akka.http.scaladsl.server.{ directives sdirectives }
import akka.http.scaladsl.{ common scommon }
import akka.http.javadsl.server.{ directives jdirectives }
import akka.http.javadsl.{ common jcommon }
import scala.collection.immutable
/**
@ -43,6 +45,8 @@ private[http] object RoutingJavaMapping {
}
implicit object convertRouteResult extends Inherited[javadsl.server.RouteResult, scaladsl.server.RouteResult]
implicit object convertSourceRenderingMode extends Inherited[jcommon.SourceRenderingMode, scommon.SourceRenderingMode]
implicit object convertDirectoryRenderer extends Inherited[jdirectives.DirectoryRenderer, sdirectives.FileAndResourceDirectives.DirectoryRenderer]
implicit object convertContentTypeResolver extends Inherited[jdirectives.ContentTypeResolver, sdirectives.ContentTypeResolver]
implicit object convertDirectoryListing extends Inherited[jdirectives.DirectoryListing, sdirectives.DirectoryListing]

View file

@ -3,49 +3,87 @@
*/
package akka.http.javadsl.server.directives
import akka.http.javadsl.model.{ContentType, HttpEntity}
import akka.http.javadsl.model.{ ContentType, HttpEntity }
import akka.util.ByteString
import java.util.{List => JList, Map => JMap}
import java.util.{ List JList, Map JMap }
import java.util.AbstractMap.SimpleImmutableEntry
import java.util.Optional
import java.util.function.{Function => JFunction}
import java.util.function.{ Function JFunction }
import akka.NotUsed
import scala.collection.JavaConverters._
import akka.http.impl.util.JavaMapping.Implicits._
import akka.http.javadsl.server.directives.FramedEntityStreamingDirectives.SourceRenderingMode
import akka.http.javadsl.server.{Route, Unmarshaller}
import akka.http.scaladsl.marshalling.ToResponseMarshallable
import akka.http.scaladsl.server.{FramingWithContentType, Directives => D}
import akka.http.scaladsl.server.directives.ParameterDirectives._
import akka.http.javadsl.common.{ FramingWithContentType, SourceRenderingMode }
import akka.http.javadsl.server.{ Marshaller, Route, Unmarshaller }
import akka.http.javadsl.model._
import akka.http.scaladsl.marshalling.{ ToResponseMarshallable, ToResponseMarshaller }
import akka.http.scaladsl.server.{ Directives D }
import akka.http.scaladsl.unmarshalling
import akka.stream.javadsl.Source
import scala.compat.java8.OptionConverters
/** EXPERIMENTAL API */
trait FramedEntityStreamingDirectives {
abstract class FramedEntityStreamingDirectives extends TimeoutDirectives {
// important import, as we implicitly resolve some marshallers inside the below directives
import akka.http.scaladsl.server.directives.FramedEntityStreamingDirectives._
def entityAsStream[T](clazz: Class[T], um: Unmarshaller[_ >: HttpEntity, T], framing: FramingWithContentType,
@CorrespondsTo("asSourceOf")
def entityasSourceOf[T](um: Unmarshaller[HttpEntity, T], framing: FramingWithContentType,
inner: java.util.function.Function[Source[T, NotUsed], Route]): Route = RouteAdapter {
D.entity[T](D.stream[T](um, framing)) { s =>
val sum = um.asScalaCastInput[akka.http.scaladsl.model.HttpEntity]
D.entity(D.asSourceOf[T](framing.asScala)(sum)) { s: akka.stream.scaladsl.Source[T, NotUsed]
inner(s.asJava).delegate
}
???
}
def completeWithSource[T](source: Source[T, Any], rendering: SourceRenderingMode): Route = RouteAdapter {
@CorrespondsTo("asSourceOfAsync")
def entityAsSourceAsyncOf[T](
parallelism: Int,
um: Unmarshaller[HttpEntity, T], framing: FramingWithContentType,
inner: java.util.function.Function[Source[T, NotUsed], Route]): Route = RouteAdapter {
val sum = um.asScalaCastInput[akka.http.scaladsl.model.HttpEntity]
D.entity(D.asSourceOfAsync[T](parallelism, framing.asScala)(sum)) { s: akka.stream.scaladsl.Source[T, NotUsed]
inner(s.asJava).delegate
}
}
@CorrespondsTo("asSourceOfAsyncUnordered")
def entityAsSourceAsyncUnorderedOf[T](
parallelism: Int,
um: Unmarshaller[HttpEntity, T], framing: FramingWithContentType,
inner: java.util.function.Function[Source[T, NotUsed], Route]): Route = RouteAdapter {
val sum = um.asScalaCastInput[akka.http.scaladsl.model.HttpEntity]
D.entity(D.asSourceOfAsyncUnordered[T](parallelism, framing.asScala)(sum)) { s: akka.stream.scaladsl.Source[T, NotUsed]
inner(s.asJava).delegate
}
}
// implicit used internally, Java caller does not benefit or use it
@CorrespondsTo("complete")
def completeWithSource[T, M](implicit source: Source[T, M], m: Marshaller[T, ByteString], rendering: SourceRenderingMode): Route = RouteAdapter {
import akka.http.scaladsl.marshalling.PredefinedToResponseMarshallers._
implicit val mm = _sourceMarshaller(m.map(ByteStringAsEntityFn), rendering)
val response = ToResponseMarshallable(source)
D.complete(response)
}
}
object FramedEntityStreamingDirectives extends FramedEntityStreamingDirectives {
trait SourceRenderingMode {
def getContentType: ContentType
@CorrespondsTo("complete")
def completeOKWithSource[T, M](implicit source: Source[T, M], m: Marshaller[T, RequestEntity], rendering: SourceRenderingMode): Route = RouteAdapter {
implicit val mm = _sourceMarshaller[T, M](m, rendering)
val response = ToResponseMarshallable(source)
D.complete(response)
}
def start: ByteString
def between: ByteString
def end: ByteString
implicit private def _sourceMarshaller[T, M](implicit m: Marshaller[T, HttpEntity], rendering: SourceRenderingMode) = {
import akka.http.javadsl.server.RoutingJavaMapping._
import akka.http.javadsl.server.RoutingJavaMapping.Implicits._
val mm = m.asScalaCastOutput
D._sourceMarshaller[T, M](mm, rendering.asScala).compose({ h: akka.stream.javadsl.Source[T, M] h.asScala })
}
private[this] val ByteStringAsEntityFn = new java.util.function.Function[ByteString, HttpEntity]() {
override def apply(bs: ByteString): HttpEntity = HttpEntities.create(bs)
}
}
object FramedEntityStreamingDirectives extends FramedEntityStreamingDirectives

View file

@ -30,6 +30,16 @@ abstract class FutureDirectives extends FormFieldDirectives {
}
}
/**
* "Unwraps" a `CompletionStage<T>` and runs the inner route after future
* completion with the future's value as an extraction of type `Try<T>`.
*/
def onComplete[T](cs: CompletionStage[T], inner: JFunction[Try[T], Route]) = RouteAdapter {
D.onComplete(cs.toScala.recover(unwrapCompletionException)) { value
inner(value).delegate
}
}
/**
* "Unwraps" a `CompletionStage[T]` and runs the inner route after future
* completion with the future's value as an extraction of type `T` if

View file

@ -101,6 +101,9 @@ abstract class Unmarshaller[-A, B] extends UnmarshallerBase[A, B] {
implicit def asScala: akka.http.scaladsl.unmarshalling.Unmarshaller[A, B]
/** INTERNAL API */
private[akka] def asScalaCastInput[I]: unmarshalling.Unmarshaller[I, B] = asScala.asInstanceOf[unmarshalling.Unmarshaller[I, B]]
def unmarshall(a: A, ec: ExecutionContext, mat: Materializer): CompletionStage[B] = asScala.apply(a)(ec, mat).toJava
/**

View file

@ -0,0 +1,23 @@
/*
* Copyright (C) 2016 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.http.scaladsl.common
import akka.NotUsed
import akka.event.Logging
import akka.http.scaladsl.model.{ ContentType, ContentTypeRange }
import akka.stream.scaladsl.{ Flow, Framing }
import akka.util.ByteString
/**
* Same as [[akka.stream.scaladsl.Framing]] but additionally can express which [[ContentType]] it supports,
* which can be used to reject routes if content type does not match used framing.
*/
abstract class FramingWithContentType extends akka.http.javadsl.common.FramingWithContentType with Framing {
def flow: Flow[ByteString, ByteString, NotUsed]
override def supported: ContentTypeRange
override def matches(ct: akka.http.javadsl.model.ContentType): Boolean = supported.matches(ct)
override def toString = s"${Logging.simpleName(getClass)}($supported)"
}

View file

@ -0,0 +1,126 @@
/*
* Copyright (C) 2016 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.http.scaladsl.common
import akka.http.scaladsl.model.{ ContentType, ContentTypes }
import akka.util.ByteString
/**
* Specialised rendering mode for streaming elements as JSON.
*
* See also: <a href="https://en.wikipedia.org/wiki/JSON_Streaming">JSON Streaming on Wikipedia</a>.
*/
trait JsonSourceRenderingMode extends akka.http.javadsl.common.JsonSourceRenderingMode with SourceRenderingMode {
override val contentType: ContentType.WithFixedCharset =
ContentTypes.`application/json`
}
/**
* Provides default JSON rendering modes.
*/
object JsonSourceRenderingModes {
/**
* Most compact rendering mode.
* It does not intersperse any separator between the signalled elements.
*
* It is the most compact form to render JSON and can be framed properly by using [[akka.stream.javadsl.JsonFraming.bracketCounting]].
*
* {{{
* {"id":42}{"id":43}{"id":44}
* }}}
*/
object Compact extends JsonSourceRenderingMode {
override val start: ByteString = ByteString.empty
override val between: ByteString = ByteString.empty
override val end: ByteString = ByteString.empty
}
/**
* Simple rendering mode, similar to [[Compact]] however interspersing elements with a `\n` character.
*
* {{{
* {"id":42},{"id":43},{"id":44}
* }}}
*/
object CompactCommaSeparated extends JsonSourceRenderingMode {
override val start: ByteString = ByteString.empty
override val between: ByteString = ByteString(",")
override val end: ByteString = ByteString.empty
}
/**
* Rendering mode useful when the receiving end expects a valid JSON Array.
* It can be useful when the client wants to detect when the stream has been successfully received in-full,
* which it can determine by seeing the terminating `]` character.
*
* The framing's terminal `]` will ONLY be emitted if the stream has completed successfully,
* in other words - the stream has been emitted completely, without errors occuring before the final element has been signaled.
*
* {{{
* [{"id":42},{"id":43},{"id":44}]
* }}}
*/
object ArrayCompact extends JsonSourceRenderingMode {
override val start: ByteString = ByteString("[")
override val between: ByteString = ByteString(",")
override val end: ByteString = ByteString("]")
}
/**
* Rendering mode useful when the receiving end expects a valid JSON Array.
* It can be useful when the client wants to detect when the stream has been successfully received in-full,
* which it can determine by seeing the terminating `]` character.
*
* The framing's terminal `]` will ONLY be emitted if the stream has completed successfully,
* in other words - the stream has been emitted completely, without errors occuring before the final element has been signaled.
*
* {{{
* [{"id":42},
* {"id":43},
* {"id":44}]
* }}}
*/
object ArrayLineByLine extends JsonSourceRenderingMode {
override val start: ByteString = ByteString("[")
override val between: ByteString = ByteString(",\n")
override val end: ByteString = ByteString("]")
}
/**
* Recommended rendering mode.
*
* It is a nice balance between valid and human-readable as well as resonably small size overhead (just the `\n` between elements).
* A good example of API's using this syntax is Twitter's Firehose (last verified at 1.1 version of that API).
*
* {{{
* {"id":42}
* {"id":43}
* {"id":44}
* }}}
*/
object LineByLine extends JsonSourceRenderingMode {
override val start: ByteString = ByteString.empty
override val between: ByteString = ByteString("\n")
override val end: ByteString = ByteString.empty
}
/**
* Simple rendering mode interspersing each pair of elements with both `,\n`.
* Picking the [[LineByLine]] format may be preferable, as it is slightly simpler to parse - each line being a valid json object (no need to trim the comma).
*
* {{{
* {"id":42},
* {"id":43},
* {"id":44}
* }}}
*/
object LineByLineCommaSeparated extends JsonSourceRenderingMode {
override val start: ByteString = ByteString.empty
override val between: ByteString = ByteString(",\n")
override val end: ByteString = ByteString.empty
}
}

View file

@ -0,0 +1,11 @@
/*
* Copyright (C) 2016 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.http.scaladsl.common
import akka.http.scaladsl.model.ContentType
trait SourceRenderingMode extends akka.http.javadsl.common.SourceRenderingMode {
override def contentType: ContentType
}

View file

@ -61,8 +61,8 @@ object StrictForm {
fsu(value.entity.data.decodeString(charsetName))
})
@implicitNotFound("In order to unmarshal a `StrictForm.Field` to type `${T}` you need to supply a " +
"`FromStringUnmarshaller[${T}]` and/or a `FromEntityUnmarshaller[${T}]`")
@implicitNotFound(s"In order to unmarshal a `StrictForm.Field` to type `$${T}` you need to supply a " +
s"`FromStringUnmarshaller[$${T}]` and/or a `FromEntityUnmarshaller[$${T}]`")
sealed trait FieldUnmarshaller[T] {
def unmarshalString(value: String)(implicit ec: ExecutionContext, mat: Materializer): Future[T]
def unmarshalPart(value: Multipart.FormData.BodyPart.Strict)(implicit ec: ExecutionContext, mat: Materializer): Future[T]

View file

@ -4,7 +4,9 @@
package akka.http.scaladsl.marshalling
import scala.concurrent.{ Future, ExecutionContext }
import akka.http.scaladsl.marshalling.Marshalling.Opaque
import scala.concurrent.{ ExecutionContext, Future }
import scala.util.control.NonFatal
import akka.http.scaladsl.model._
import akka.http.scaladsl.util.FastFuture

View file

@ -0,0 +1,69 @@
/*
* Copyright (C) 2009-2015 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.http.scaladsl.server
import akka.NotUsed
import akka.actor.ActorSystem
import akka.event.Logging
import akka.http.scaladsl.common.{ FramingWithContentType, SourceRenderingMode }
import akka.http.scaladsl.model.{ ContentTypeRange, ContentTypes, MediaRange, MediaRanges }
import akka.stream.scaladsl.{ Flow, Framing }
import akka.util.ByteString
import com.typesafe.config.Config
/**
* Entity streaming support, independent of used Json parsing library etc.
*
* Can be extended by various Support traits (e.g. "SprayJsonSupport"),
* in order to provide users with both `framing` (this trait) and `marshalling`
* (implemented by a library) by using a single trait.
*/
trait EntityStreamingSupport extends EntityStreamingSupportBase {
/**
* Implement as `implicit val` with required framing implementation, for example in
* the case of streaming JSON uploads it could be `bracketCountingJsonFraming(maximumObjectLength)`.
*/
def incomingEntityStreamFraming: FramingWithContentType
/**
* Implement as `implicit val` with the rendering mode to be used when redering `Source` instances.
* For example for JSON it could be [[akka.http.scaladsl.common.JsonSourceRenderingMode.CompactArray]]
* or [[akka.http.scaladsl.common.JsonSourceRenderingMode.LineByLine]].
*/
def outgoingEntityStreamRendering: SourceRenderingMode
}
trait EntityStreamingSupportBase {
/** `application/json` specific Framing implementation */
def bracketCountingJsonFraming(maximumObjectLength: Int): FramingWithContentType =
new ApplicationJsonBracketCountingFraming(maximumObjectLength)
/**
* Frames incoming `text / *` entities on a line-by-line basis.
* Useful for accepting `text/csv` uploads as a stream of rows.
*/
def newLineFraming(maximumObjectLength: Int, supportedContentTypes: ContentTypeRange): FramingWithContentType =
new FramingWithContentType {
override final val flow: Flow[ByteString, ByteString, NotUsed] =
Flow[ByteString].via(Framing.delimiter(ByteString("\n"), maximumObjectLength))
override final val supported: ContentTypeRange =
ContentTypeRange(MediaRanges.`text/*`)
}
}
/**
* Entity streaming support, independent of used Json parsing library etc.
*
* Can be extended by various Support traits (e.g. "SprayJsonSupport"),
* in order to provide users with both `framing` (this trait) and `marshalling`
* (implemented by a library) by using a single trait.
*/
object EntityStreamingSupport extends EntityStreamingSupportBase
final class ApplicationJsonBracketCountingFraming(maximumObjectLength: Int) extends FramingWithContentType {
override final val flow = Flow[ByteString].via(akka.stream.scaladsl.JsonFraming.bracketCounting(maximumObjectLength))
override final val supported = ContentTypeRange(ContentTypes.`application/json`)
}

View file

@ -1,168 +0,0 @@
/*
* Copyright (C) 2009-2015 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.http.scaladsl.server
import akka.NotUsed
import akka.actor.ActorSystem
import akka.http.impl.util.JavaMapping
import akka.http.scaladsl.model.{ ContentType, ContentTypes }
import akka.http.scaladsl.server.directives.FramedEntityStreamingDirectives.SourceRenderingMode
import akka.stream.scaladsl.{ Flow, Framing }
import akka.util.ByteString
import com.typesafe.config.Config
import scala.collection.immutable
/**
* Same as [[akka.stream.scaladsl.Framing]] but additionally can express which [[ContentType]] it supports,
* which can be used to reject routes if content type does not match used framing.
*/
abstract class FramingWithContentType extends Framing {
def flow: Flow[ByteString, ByteString, NotUsed]
def supported: immutable.Set[ContentType]
def isSupported(ct: akka.http.javadsl.model.ContentType): Boolean = supported(JavaMapping.ContentType.toScala(ct))
}
object FramingWithContentType {
def apply(framing: Flow[ByteString, ByteString, NotUsed], contentType: ContentType, moreContentTypes: ContentType*) =
new FramingWithContentType {
override def flow = framing
override val supported: immutable.Set[ContentType] =
if (moreContentTypes.isEmpty) Set(contentType)
else Set(contentType) ++ moreContentTypes
}
}
/**
* Json entity streaming support, independent of used Json parsing library.
*
* Can be extended by various Support traits (e.g. "SprayJsonSupport"),
* in order to provide users with both `framing` (this trait) and `marshalling`
* (implemented by a library) by using a single trait.
*/
trait JsonEntityFramingSupport {
/** `application/json` specific Framing implementation */
def bracketCountingJsonFraming(maximumObjectLength: Int) = new FramingWithContentType {
override final val flow = Flow[ByteString].via(akka.stream.scaladsl.JsonFraming.bracketCounting(maximumObjectLength))
override val supported: immutable.Set[ContentType] = Set(ContentTypes.`application/json`)
}
}
object JsonEntityFramingSupport extends JsonEntityFramingSupport
/**
* Specialised rendering mode for streaming elements as JSON.
*
* See also: <a href="https://en.wikipedia.org/wiki/JSON_Streaming">JSON Streaming on Wikipedia</a>.
*/
trait JsonSourceRenderingMode extends SourceRenderingMode {
override val contentType = ContentTypes.`application/json`
}
object JsonSourceRenderingMode {
/**
* Most compact rendering mode
* It does not intersperse any separator between the signalled elements.
*
* {{{
* {"id":42}{"id":43}{"id":44}
* }}}
*/
object Compact extends JsonSourceRenderingMode {
override val start: ByteString = ByteString.empty
override val between: ByteString = ByteString.empty
override val end: ByteString = ByteString.empty
}
/**
* Simple rendering mode, similar to [[Compact]] however interspersing elements with a `\n` character.
*
* {{{
* {"id":42},{"id":43},{"id":44}
* }}}
*/
object CompactCommaSeparated extends JsonSourceRenderingMode {
override val start: ByteString = ByteString.empty
override val between: ByteString = ByteString(",")
override val end: ByteString = ByteString.empty
}
/**
* Rendering mode useful when the receiving end expects a valid JSON Array.
* It can be useful when the client wants to detect when the stream has been successfully received in-full,
* which it can determine by seeing the terminating `]` character.
*
* The framing's terminal `]` will ONLY be emitted if the stream has completed successfully,
* in other words - the stream has been emitted completely, without errors occuring before the final element has been signaled.
*
* {{{
* [{"id":42},{"id":43},{"id":44}]
* }}}
*/
object CompactArray extends JsonSourceRenderingMode {
override val start: ByteString = ByteString("[")
override val between: ByteString = ByteString(",")
override val end: ByteString = ByteString("]")
}
/**
* Recommended rendering mode.
*
* It is a nice balance between valid and human-readable as well as resonably small size overhead (just the `\n` between elements).
* A good example of API's using this syntax is Twitter's Firehose (last verified at 1.1 version of that API).
*
* {{{
* {"id":42}
* {"id":43}
* {"id":44}
* }}}
*/
object LineByLine extends JsonSourceRenderingMode {
override val start: ByteString = ByteString.empty
override val between: ByteString = ByteString("\n")
override val end: ByteString = ByteString.empty
}
/**
* Simple rendering mode interspersing each pair of elements with both `,\n`.
* Picking the [[LineByLine]] format may be preferable, as it is slightly simpler to parse - each line being a valid json object (no need to trim the comma).
*
* {{{
* {"id":42},
* {"id":43},
* {"id":44}
* }}}
*/
object LineByLineCommaSeparated extends JsonSourceRenderingMode {
override val start: ByteString = ByteString.empty
override val between: ByteString = ByteString(",\n")
override val end: ByteString = ByteString.empty
}
}
object JsonStreamingSettings {
def apply(sys: ActorSystem): JsonStreamingSettings =
apply(sys.settings.config.getConfig("akka.http.json-streaming"))
def apply(c: Config): JsonStreamingSettings = {
JsonStreamingSettings(
c.getInt("max-object-size"),
renderingMode(c.getString("rendering-mode")))
}
def renderingMode(name: String): SourceRenderingMode = name match {
case "line-by-line" JsonSourceRenderingMode.LineByLine // the default
case "line-by-line-comma-separated" JsonSourceRenderingMode.LineByLineCommaSeparated
case "compact" JsonSourceRenderingMode.Compact
case "compact-comma-separated" JsonSourceRenderingMode.CompactCommaSeparated
case "compact-array" JsonSourceRenderingMode.CompactArray
}
}
final case class JsonStreamingSettings(
maxObjectSize: Int,
style: SourceRenderingMode)

View file

@ -4,9 +4,9 @@
package akka.http.scaladsl.server.directives
import akka.NotUsed
import akka.http.scaladsl.common.{ FramingWithContentType, SourceRenderingMode }
import akka.http.scaladsl.marshalling._
import akka.http.scaladsl.model._
import akka.http.scaladsl.server.FramingWithContentType
import akka.http.scaladsl.unmarshalling.{ Unmarshal, Unmarshaller, _ }
import akka.http.scaladsl.util.FastFuture
import akka.stream.Materializer
@ -28,33 +28,31 @@ trait FramedEntityStreamingDirectives extends MarshallingDirectives {
// TODO DOCS
final def stream[T](implicit um: Unmarshaller[ByteString, T], framing: FramingWithContentType): RequestToSourceUnmarshaller[T] =
streamAsync(1)(um, framing)
final def stream[T](framing: FramingWithContentType)(implicit um: Unmarshaller[ByteString, T]): RequestToSourceUnmarshaller[T] =
streamAsync(1)(um, framing)
final def asSourceOf[T](implicit um: Unmarshaller[HttpEntity, T], framing: FramingWithContentType): RequestToSourceUnmarshaller[T] =
asSourceOfAsync(1)(um, framing)
final def asSourceOf[T](framing: FramingWithContentType)(implicit um: Unmarshaller[HttpEntity, T]): RequestToSourceUnmarshaller[T] =
asSourceOfAsync(1)(um, framing)
final def streamAsync[T](parallelism: Int)(implicit um: Unmarshaller[ByteString, T], framing: FramingWithContentType): RequestToSourceUnmarshaller[T] =
streamInternal[T](framing, (ec, mat) Flow[ByteString].mapAsync(parallelism)(Unmarshal(_).to[T](um, ec, mat)))
final def streamAsync[T](parallelism: Int, framing: FramingWithContentType)(implicit um: Unmarshaller[ByteString, T]): RequestToSourceUnmarshaller[T] =
streamAsync(parallelism)(um, framing)
final def asSourceOfAsync[T](parallelism: Int)(implicit um: Unmarshaller[HttpEntity, T], framing: FramingWithContentType): RequestToSourceUnmarshaller[T] =
asSourceOfInternal[T](framing, (ec, mat) Flow[HttpEntity].mapAsync(parallelism)(Unmarshal(_).to[T](um, ec, mat)))
final def asSourceOfAsync[T](parallelism: Int, framing: FramingWithContentType)(implicit um: Unmarshaller[HttpEntity, T]): RequestToSourceUnmarshaller[T] =
asSourceOfAsync(parallelism)(um, framing)
final def streamAsyncUnordered[T](parallelism: Int)(implicit um: Unmarshaller[ByteString, T], framing: FramingWithContentType): RequestToSourceUnmarshaller[T] =
streamInternal[T](framing, (ec, mat) Flow[ByteString].mapAsyncUnordered(parallelism)(Unmarshal(_).to[T](um, ec, mat)))
final def streamAsyncUnordered[T](parallelism: Int, framing: FramingWithContentType)(implicit um: Unmarshaller[ByteString, T]): RequestToSourceUnmarshaller[T] =
streamAsyncUnordered(parallelism)(um, framing)
// TODO materialized value may want to be "drain/cancel" or something like it?
// TODO could expose `streamMat`, for more fine grained picking of Marshaller
final def asSourceOfAsyncUnordered[T](parallelism: Int)(implicit um: Unmarshaller[HttpEntity, T], framing: FramingWithContentType): RequestToSourceUnmarshaller[T] =
asSourceOfInternal[T](framing, (ec, mat) Flow[HttpEntity].mapAsyncUnordered(parallelism)(Unmarshal(_).to[T](um, ec, mat)))
final def asSourceOfAsyncUnordered[T](parallelism: Int, framing: FramingWithContentType)(implicit um: Unmarshaller[HttpEntity, T]): RequestToSourceUnmarshaller[T] =
asSourceOfAsyncUnordered(parallelism)(um, framing)
// format: OFF
private def streamInternal[T](framing: FramingWithContentType, marshalling: (ExecutionContext, Materializer) => Flow[ByteString, ByteString, NotUsed]#ReprMat[T, NotUsed]): RequestToSourceUnmarshaller[T] =
private def asSourceOfInternal[T](framing: FramingWithContentType, marshalling: (ExecutionContext, Materializer) => Flow[HttpEntity, ByteString, NotUsed]#ReprMat[T, NotUsed]): RequestToSourceUnmarshaller[T] =
Unmarshaller.withMaterializer[HttpRequest, Source[T, NotUsed]] { implicit ec implicit mat req
val entity = req.entity
if (!framing.supported(entity.contentType)) {
val supportedContentTypes = framing.supported.map(ContentTypeRange(_))
if (!framing.matches(entity.contentType)) {
val supportedContentTypes = framing.supported
FastFuture.failed(Unmarshaller.UnsupportedContentTypeException(supportedContentTypes))
} else {
val stream = entity.dataBytes.via(framing.flow).via(marshalling(ec, mat)).mapMaterializedValue(_ => NotUsed)
// val stream = entity.dataBytes.via(framing.flow).via(marshalling(ec, mat)).mapMaterializedValue(_ => NotUsed)
val stream = Source.single(entity.transformDataBytes(framing.flow)).via(marshalling(ec, mat)).mapMaterializedValue(_ => NotUsed)
FastFuture.successful(stream)
}
}
@ -65,7 +63,7 @@ trait FramedEntityStreamingDirectives extends MarshallingDirectives {
implicit def _sourceMarshaller[T, M](implicit m: ToEntityMarshaller[T], mode: SourceRenderingMode): ToResponseMarshaller[Source[T, M]] =
Marshaller[Source[T, M], HttpResponse] { implicit ec source
FastFuture successful {
Marshalling.WithFixedContentType(mode.contentType, () { // TODO charset?
Marshalling.WithFixedContentType(mode.contentType, () {
val bytes = source
.mapAsync(1)(t Marshal(t).to[HttpEntity])
.map(_.dataBytes)
@ -79,7 +77,7 @@ trait FramedEntityStreamingDirectives extends MarshallingDirectives {
implicit def _sourceParallelismMarshaller[T](implicit m: ToEntityMarshaller[T], mode: SourceRenderingMode): ToResponseMarshaller[AsyncRenderingOf[T]] =
Marshaller[AsyncRenderingOf[T], HttpResponse] { implicit ec rendering
FastFuture successful {
Marshalling.WithFixedContentType(mode.contentType, () { // TODO charset?
Marshalling.WithFixedContentType(mode.contentType, () {
val bytes = rendering.source
.mapAsync(rendering.parallelism)(t Marshal(t).to[HttpEntity])
.map(_.dataBytes)
@ -93,7 +91,7 @@ trait FramedEntityStreamingDirectives extends MarshallingDirectives {
implicit def _sourceUnorderedMarshaller[T](implicit m: ToEntityMarshaller[T], mode: SourceRenderingMode): ToResponseMarshaller[AsyncUnorderedRenderingOf[T]] =
Marshaller[AsyncUnorderedRenderingOf[T], HttpResponse] { implicit ec rendering
FastFuture successful {
Marshalling.WithFixedContentType(mode.contentType, () { // TODO charset?
Marshalling.WithFixedContentType(mode.contentType, () {
val bytes = rendering.source
.mapAsync(rendering.parallelism)(t Marshal(t).to[HttpEntity])
.map(_.dataBytes)
@ -106,32 +104,34 @@ trait FramedEntityStreamingDirectives extends MarshallingDirectives {
// special rendering modes
implicit def enableSpecialSourceRenderingModes[T](source: Source[T, Any]): EnableSpecialSourceRenderingModes[T] =
implicit def _enableSpecialSourceRenderingModes[T](source: Source[T, Any]): EnableSpecialSourceRenderingModes[T] =
new EnableSpecialSourceRenderingModes(source)
}
object FramedEntityStreamingDirectives extends FramedEntityStreamingDirectives {
/**
* Defines ByteStrings to be injected before the first, between, and after all elements of a [[Source]],
* when used to complete a request.
*
* A typical example would be rendering a ``Source[T, _]`` as JSON array,
* where start is `[`, between is `,`, and end is `]` - which procudes a valid json array, assuming each element can
* be properly marshalled as JSON object.
*
* The corresponding values will typically be put into an [[Source.intersperse]] call on the to-be-rendered Source.
*/
trait SourceRenderingMode extends akka.http.javadsl.server.directives.FramedEntityStreamingDirectives.SourceRenderingMode {
override final def getContentType = contentType
def contentType: ContentType
}
final class AsyncRenderingOf[T](val source: Source[T, Any], val parallelism: Int)
final class AsyncUnorderedRenderingOf[T](val source: Source[T, Any], val parallelism: Int)
sealed class AsyncSourceRenderingMode
final class AsyncRenderingOf[T](val source: Source[T, Any], val parallelism: Int) extends AsyncSourceRenderingMode
final class AsyncUnorderedRenderingOf[T](val source: Source[T, Any], val parallelism: Int) extends AsyncSourceRenderingMode
}
final class EnableSpecialSourceRenderingModes[T](val source: Source[T, Any]) extends AnyVal {
/**
* Causes the response stream to be marshalled asynchronously (up to `parallelism` elements at once),
* while retaining the ordering of incoming elements.
*
* See also [[Source.mapAsync]].
*/
def renderAsync(parallelism: Int) = new FramedEntityStreamingDirectives.AsyncRenderingOf(source, parallelism)
/**
* Causes the response stream to be marshalled asynchronously (up to `parallelism` elements at once),
* emitting the first one that finished marshalling onto the wire.
*
* This sacrifices ordering of the incoming data in regards to data actually rendered onto the wire,
* but may be faster if some elements are smaller than other ones by not stalling the small elements
* from being written while the large one still is being marshalled.
*
* See also [[Source.mapAsyncUnordered]].
*/
def renderAsyncUnordered(parallelism: Int) = new FramedEntityStreamingDirectives.AsyncUnorderedRenderingOf(source, parallelism)
}

View file

@ -115,3 +115,18 @@ object Framing {
scaladsl.Framing.simpleFramingProtocol(maximumMessageLength).asJava
}
/**
* Wrapper around a framing Flow (as provided by [[Framing.delimiter]] for example.
* Used for providing a framing implicitly for other components which may need one (such as framed entity streaming in Akka HTTP).
*/
trait Framing {
def asScala: akka.stream.scaladsl.Framing =
this match {
case f: akka.stream.scaladsl.Framing f
case _ new akka.stream.scaladsl.Framing {
override def flow = getFlow.asScala
}
}
def getFlow: Flow[ByteString, ByteString, NotUsed]
}

View file

@ -292,6 +292,9 @@ object Framing {
* Wrapper around a framing Flow (as provided by [[Framing.delimiter]] for example.
* Used for providing a framing implicitly for other components which may need one (such as framed entity streaming in Akka HTTP).
*/
trait Framing {
trait Framing extends akka.stream.javadsl.Framing {
final def asJava: akka.stream.javadsl.Framing = this
override final def getFlow = flow.asJava
def flow: Flow[ByteString, ByteString, NotUsed]
}