=htp #16934 s/ActorFlowMaterializer/FlowMaterializer where possible, smaller cleanups
This commit is contained in:
parent
abea2c88ef
commit
ef208bfa8b
10 changed files with 45 additions and 52 deletions
|
|
@ -4,25 +4,23 @@
|
|||
|
||||
package akka.http.testkit
|
||||
|
||||
import akka.http.unmarshalling.{ Unmarshal, FromEntityUnmarshaller }
|
||||
|
||||
import scala.concurrent.duration._
|
||||
import scala.concurrent.{ ExecutionContext, Await }
|
||||
|
||||
import akka.http.unmarshalling.{ Unmarshal, FromEntityUnmarshaller }
|
||||
import akka.http.marshalling._
|
||||
import akka.http.model.HttpEntity
|
||||
import akka.stream.ActorFlowMaterializer
|
||||
import akka.stream.FlowMaterializer
|
||||
|
||||
import scala.util.Try
|
||||
|
||||
trait MarshallingTestUtils {
|
||||
def marshal[T: ToEntityMarshaller](value: T)(implicit ec: ExecutionContext, mat: ActorFlowMaterializer): HttpEntity.Strict =
|
||||
def marshal[T: ToEntityMarshaller](value: T)(implicit ec: ExecutionContext, mat: FlowMaterializer): HttpEntity.Strict =
|
||||
Await.result(Marshal(value).to[HttpEntity].flatMap(_.toStrict(1.second)), 1.second)
|
||||
|
||||
def unmarshalValue[T: FromEntityUnmarshaller](entity: HttpEntity)(implicit ec: ExecutionContext, mat: ActorFlowMaterializer): T =
|
||||
def unmarshalValue[T: FromEntityUnmarshaller](entity: HttpEntity)(implicit ec: ExecutionContext, mat: FlowMaterializer): T =
|
||||
unmarshal(entity).get
|
||||
|
||||
def unmarshal[T: FromEntityUnmarshaller](entity: HttpEntity)(implicit ec: ExecutionContext, mat: ActorFlowMaterializer): Try[T] = {
|
||||
def unmarshal[T: FromEntityUnmarshaller](entity: HttpEntity)(implicit ec: ExecutionContext, mat: FlowMaterializer): Try[T] = {
|
||||
val fut = Unmarshal(entity).to[T]
|
||||
Await.ready(fut, 1.second)
|
||||
fut.value.get
|
||||
|
|
|
|||
|
|
@ -9,7 +9,7 @@ import scala.collection.immutable
|
|||
import scala.concurrent.duration._
|
||||
import scala.concurrent.ExecutionContext
|
||||
import akka.http.util._
|
||||
import akka.stream.ActorFlowMaterializer
|
||||
import akka.stream.FlowMaterializer
|
||||
import akka.stream.scaladsl._
|
||||
import akka.http.model.HttpEntity.ChunkStreamPart
|
||||
import akka.http.server._
|
||||
|
|
@ -22,7 +22,7 @@ trait RouteTestResultComponent {
|
|||
/**
|
||||
* A receptacle for the response or rejections created by a route.
|
||||
*/
|
||||
class RouteTestResult(timeout: FiniteDuration)(implicit fm: ActorFlowMaterializer) {
|
||||
class RouteTestResult(timeout: FiniteDuration)(implicit fm: FlowMaterializer) {
|
||||
private[this] var result: Option[Either[immutable.Seq[Rejection], HttpResponse]] = None
|
||||
private[this] val latch = new CountDownLatch(1)
|
||||
|
||||
|
|
|
|||
|
|
@ -4,15 +4,14 @@
|
|||
|
||||
package akka.http.testkit
|
||||
|
||||
import akka.http.model.HttpEntity
|
||||
import akka.http.unmarshalling.FromEntityUnmarshaller
|
||||
import akka.stream.ActorFlowMaterializer
|
||||
import scala.util.Try
|
||||
import scala.concurrent.{ ExecutionContext, Future, Await }
|
||||
import scala.concurrent.duration._
|
||||
import org.scalatest.Suite
|
||||
import org.scalatest.matchers.Matcher
|
||||
|
||||
import scala.concurrent.duration._
|
||||
import scala.concurrent.{ ExecutionContext, Future, Await }
|
||||
import scala.util.Try
|
||||
import akka.http.model.HttpEntity
|
||||
import akka.http.unmarshalling.FromEntityUnmarshaller
|
||||
import akka.stream.FlowMaterializer
|
||||
|
||||
trait ScalatestUtils extends MarshallingTestUtils {
|
||||
import org.scalatest.Matchers._
|
||||
|
|
@ -22,10 +21,10 @@ trait ScalatestUtils extends MarshallingTestUtils {
|
|||
def haveFailedWith(t: Throwable): Matcher[Future[_]] =
|
||||
equal(t).matcher[Throwable] compose (x ⇒ Await.result(x.failed, 1.second))
|
||||
|
||||
def unmarshalToValue[T: FromEntityUnmarshaller](value: T)(implicit ec: ExecutionContext, mat: ActorFlowMaterializer): Matcher[HttpEntity] =
|
||||
def unmarshalToValue[T: FromEntityUnmarshaller](value: T)(implicit ec: ExecutionContext, mat: FlowMaterializer): Matcher[HttpEntity] =
|
||||
equal(value).matcher[T] compose (unmarshalValue(_))
|
||||
|
||||
def unmarshalTo[T: FromEntityUnmarshaller](value: Try[T])(implicit ec: ExecutionContext, mat: ActorFlowMaterializer): Matcher[HttpEntity] =
|
||||
def unmarshalTo[T: FromEntityUnmarshaller](value: Try[T])(implicit ec: ExecutionContext, mat: FlowMaterializer): Matcher[HttpEntity] =
|
||||
equal(value).matcher[Try[T]] compose (unmarshal(_))
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -5,8 +5,7 @@
|
|||
package akka.http.coding
|
||||
|
||||
import akka.http.model._
|
||||
import akka.http.util.StreamUtils
|
||||
import akka.stream.ActorFlowMaterializer
|
||||
import akka.stream.FlowMaterializer
|
||||
import akka.stream.stage.Stage
|
||||
import akka.util.ByteString
|
||||
import headers.HttpEncoding
|
||||
|
|
@ -28,7 +27,7 @@ trait Decoder {
|
|||
def withMaxBytesPerChunk(maxBytesPerChunk: Int): Decoder
|
||||
|
||||
def decoderFlow: Flow[ByteString, ByteString, Unit]
|
||||
def decode(input: ByteString)(implicit mat: ActorFlowMaterializer): Future[ByteString] =
|
||||
def decode(input: ByteString)(implicit mat: FlowMaterializer): Future[ByteString] =
|
||||
Source.single(input).via(decoderFlow).runWith(Sink.head())
|
||||
}
|
||||
object Decoder {
|
||||
|
|
|
|||
|
|
@ -8,7 +8,7 @@ import scala.annotation.implicitNotFound
|
|||
import scala.collection.immutable
|
||||
import scala.concurrent.{ ExecutionContext, Future }
|
||||
import scala.concurrent.duration._
|
||||
import akka.stream.ActorFlowMaterializer
|
||||
import akka.stream.FlowMaterializer
|
||||
import akka.http.util.FastFuture
|
||||
import akka.http.unmarshalling._
|
||||
import akka.http.model._
|
||||
|
|
@ -87,7 +87,7 @@ object StrictForm {
|
|||
|
||||
implicit def unmarshaller(implicit formDataUM: FromEntityUnmarshaller[FormData],
|
||||
multipartUM: FromEntityUnmarshaller[Multipart.FormData],
|
||||
ec: ExecutionContext, fm: ActorFlowMaterializer): FromEntityUnmarshaller[StrictForm] = {
|
||||
ec: ExecutionContext, fm: FlowMaterializer): FromEntityUnmarshaller[StrictForm] = {
|
||||
|
||||
def tryUnmarshalToQueryForm(entity: HttpEntity): Future[StrictForm] =
|
||||
for (formData ← formDataUM(entity).fast) yield {
|
||||
|
|
|
|||
|
|
@ -4,9 +4,8 @@
|
|||
|
||||
package akka.http.server
|
||||
|
||||
import akka.stream.ActorFlowMaterializer
|
||||
|
||||
import scala.concurrent.{ Future, ExecutionContext }
|
||||
import akka.stream.FlowMaterializer
|
||||
import akka.event.LoggingAdapter
|
||||
import akka.http.marshalling.ToResponseMarshallable
|
||||
import akka.http.model._
|
||||
|
|
@ -31,7 +30,7 @@ trait RequestContext {
|
|||
/**
|
||||
* The default FlowMaterializer.
|
||||
*/
|
||||
implicit def flowMaterializer: ActorFlowMaterializer
|
||||
implicit def flowMaterializer: FlowMaterializer
|
||||
|
||||
/**
|
||||
* The default LoggingAdapter to be used for logging messages related to this request.
|
||||
|
|
@ -48,7 +47,7 @@ trait RequestContext {
|
|||
*/
|
||||
def reconfigure(
|
||||
executionContext: ExecutionContext = executionContext,
|
||||
flowMaterializer: ActorFlowMaterializer = flowMaterializer,
|
||||
flowMaterializer: FlowMaterializer = flowMaterializer,
|
||||
log: LoggingAdapter = log,
|
||||
settings: RoutingSettings = settings): RequestContext
|
||||
|
||||
|
|
@ -82,7 +81,7 @@ trait RequestContext {
|
|||
/**
|
||||
* Returns a copy of this context with the new HttpRequest.
|
||||
*/
|
||||
def withFlowMaterializer(materializer: ActorFlowMaterializer): RequestContext
|
||||
def withFlowMaterializer(materializer: FlowMaterializer): RequestContext
|
||||
|
||||
/**
|
||||
* Returns a copy of this context with the new LoggingAdapter.
|
||||
|
|
|
|||
|
|
@ -4,9 +4,8 @@
|
|||
|
||||
package akka.http.server
|
||||
|
||||
import akka.stream.ActorFlowMaterializer
|
||||
|
||||
import scala.concurrent.{ Future, ExecutionContext }
|
||||
import akka.stream.FlowMaterializer
|
||||
import akka.event.LoggingAdapter
|
||||
import akka.http.marshalling.{ Marshal, ToResponseMarshallable }
|
||||
import akka.http.util.FastFuture
|
||||
|
|
@ -20,14 +19,14 @@ private[http] class RequestContextImpl(
|
|||
val request: HttpRequest,
|
||||
val unmatchedPath: Uri.Path,
|
||||
val executionContext: ExecutionContext,
|
||||
val flowMaterializer: ActorFlowMaterializer,
|
||||
val flowMaterializer: FlowMaterializer,
|
||||
val log: LoggingAdapter,
|
||||
val settings: RoutingSettings) extends RequestContext {
|
||||
|
||||
def this(request: HttpRequest, log: LoggingAdapter, settings: RoutingSettings)(implicit ec: ExecutionContext, materializer: ActorFlowMaterializer) =
|
||||
def this(request: HttpRequest, log: LoggingAdapter, settings: RoutingSettings)(implicit ec: ExecutionContext, materializer: FlowMaterializer) =
|
||||
this(request, request.uri.path, ec, materializer, log, settings)
|
||||
|
||||
def reconfigure(executionContext: ExecutionContext, flowMaterializer: ActorFlowMaterializer, log: LoggingAdapter, settings: RoutingSettings): RequestContext =
|
||||
def reconfigure(executionContext: ExecutionContext, flowMaterializer: FlowMaterializer, log: LoggingAdapter, settings: RoutingSettings): RequestContext =
|
||||
copy(executionContext = executionContext, flowMaterializer = flowMaterializer, log = log, settings = settings)
|
||||
|
||||
override def complete(trm: ToResponseMarshallable): Future[RouteResult] =
|
||||
|
|
@ -51,7 +50,7 @@ private[http] class RequestContextImpl(
|
|||
override def withExecutionContext(executionContext: ExecutionContext): RequestContext =
|
||||
if (executionContext != this.executionContext) copy(executionContext = executionContext) else this
|
||||
|
||||
override def withFlowMaterializer(flowMaterializer: ActorFlowMaterializer): RequestContext =
|
||||
override def withFlowMaterializer(flowMaterializer: FlowMaterializer): RequestContext =
|
||||
if (flowMaterializer != this.flowMaterializer) copy(flowMaterializer = flowMaterializer) else this
|
||||
|
||||
override def withLog(log: LoggingAdapter): RequestContext =
|
||||
|
|
@ -85,7 +84,7 @@ private[http] class RequestContextImpl(
|
|||
private def copy(request: HttpRequest = request,
|
||||
unmatchedPath: Uri.Path = unmatchedPath,
|
||||
executionContext: ExecutionContext = executionContext,
|
||||
flowMaterializer: ActorFlowMaterializer = flowMaterializer,
|
||||
flowMaterializer: FlowMaterializer = flowMaterializer,
|
||||
log: LoggingAdapter = log,
|
||||
settings: RoutingSettings = settings) =
|
||||
new RequestContextImpl(request, unmatchedPath, executionContext, flowMaterializer, log, settings)
|
||||
|
|
|
|||
|
|
@ -7,7 +7,7 @@ package akka.http.server
|
|||
import scala.concurrent.ExecutionContext
|
||||
import akka.event.LoggingAdapter
|
||||
import akka.actor.{ ActorSystem, ActorContext }
|
||||
import akka.stream.ActorFlowMaterializer
|
||||
import akka.stream.FlowMaterializer
|
||||
import akka.http.Http
|
||||
import akka.http.model.HttpRequest
|
||||
|
||||
|
|
@ -34,12 +34,12 @@ class RoutingSetup(
|
|||
val exceptionHandler: ExceptionHandler,
|
||||
val rejectionHandler: RejectionHandler,
|
||||
val executionContext: ExecutionContext,
|
||||
val flowMaterializer: ActorFlowMaterializer,
|
||||
val flowMaterializer: FlowMaterializer,
|
||||
val routingLog: RoutingLog) {
|
||||
|
||||
// enable `import setup._` to properly bring implicits in scope
|
||||
implicit def executor: ExecutionContext = executionContext
|
||||
implicit def materializer: ActorFlowMaterializer = flowMaterializer
|
||||
implicit def materializer: FlowMaterializer = flowMaterializer
|
||||
}
|
||||
|
||||
object RoutingSetup {
|
||||
|
|
@ -47,7 +47,7 @@ object RoutingSetup {
|
|||
exceptionHandler: ExceptionHandler = null,
|
||||
rejectionHandler: RejectionHandler = null,
|
||||
executionContext: ExecutionContext,
|
||||
flowMaterializer: ActorFlowMaterializer,
|
||||
flowMaterializer: FlowMaterializer,
|
||||
routingLog: RoutingLog): RoutingSetup =
|
||||
new RoutingSetup(
|
||||
routingSettings,
|
||||
|
|
|
|||
|
|
@ -5,11 +5,10 @@
|
|||
package akka.http.server
|
||||
package directives
|
||||
|
||||
import akka.event.LoggingAdapter
|
||||
import akka.stream.ActorFlowMaterializer
|
||||
|
||||
import scala.concurrent.{ Future, ExecutionContext }
|
||||
import scala.collection.immutable
|
||||
import akka.event.LoggingAdapter
|
||||
import akka.stream.FlowMaterializer
|
||||
import akka.http.server.util.Tuple
|
||||
import akka.http.util.FastFuture
|
||||
import akka.http.model._
|
||||
|
|
@ -144,13 +143,13 @@ trait BasicDirectives {
|
|||
/**
|
||||
* Runs its inner route with the given alternative [[FlowMaterializer]].
|
||||
*/
|
||||
def withFlowMaterializer(materializer: ActorFlowMaterializer): Directive0 =
|
||||
def withFlowMaterializer(materializer: FlowMaterializer): Directive0 =
|
||||
mapRequestContext(_ withFlowMaterializer materializer)
|
||||
|
||||
/**
|
||||
* Extracts the [[ExecutionContext]] from the [[RequestContext]].
|
||||
*/
|
||||
def extractFlowMaterializer: Directive1[ActorFlowMaterializer] = BasicDirectives._extractFlowMaterializer
|
||||
def extractFlowMaterializer: Directive1[FlowMaterializer] = BasicDirectives._extractFlowMaterializer
|
||||
|
||||
/**
|
||||
* Runs its inner route with the given alternative [[LoggingAdapter]].
|
||||
|
|
@ -193,7 +192,7 @@ object BasicDirectives extends BasicDirectives {
|
|||
private val _extractRequest: Directive1[HttpRequest] = extract(_.request)
|
||||
private val _extractUri: Directive1[Uri] = extract(_.request.uri)
|
||||
private val _extractExecutionContext: Directive1[ExecutionContext] = extract(_.executionContext)
|
||||
private val _extractFlowMaterializer: Directive1[ActorFlowMaterializer] = extract(_.flowMaterializer)
|
||||
private val _extractFlowMaterializer: Directive1[FlowMaterializer] = extract(_.flowMaterializer)
|
||||
private val _extractLog: Directive1[LoggingAdapter] = extract(_.log)
|
||||
private val _extractSettings: Directive1[RoutingSettings] = extract(_.settings)
|
||||
private val _extractRequestContext: Directive1[RequestContext] = extract(akka.http.util.identityFunc)
|
||||
|
|
|
|||
|
|
@ -5,24 +5,24 @@
|
|||
package akka.http.unmarshalling
|
||||
|
||||
import scala.concurrent.ExecutionContext
|
||||
import akka.stream.ActorFlowMaterializer
|
||||
import akka.stream.FlowMaterializer
|
||||
import akka.util.ByteString
|
||||
import akka.http.util.FastFuture
|
||||
import akka.http.model._
|
||||
|
||||
trait PredefinedFromEntityUnmarshallers extends MultipartUnmarshallers {
|
||||
|
||||
implicit def byteStringUnmarshaller(implicit fm: ActorFlowMaterializer): FromEntityUnmarshaller[ByteString] =
|
||||
implicit def byteStringUnmarshaller(implicit fm: FlowMaterializer): FromEntityUnmarshaller[ByteString] =
|
||||
Unmarshaller {
|
||||
case HttpEntity.Strict(_, data) ⇒ FastFuture.successful(data)
|
||||
case entity ⇒ entity.dataBytes.runFold(ByteString.empty)(_ ++ _)
|
||||
}
|
||||
|
||||
implicit def byteArrayUnmarshaller(implicit fm: ActorFlowMaterializer,
|
||||
implicit def byteArrayUnmarshaller(implicit fm: FlowMaterializer,
|
||||
ec: ExecutionContext): FromEntityUnmarshaller[Array[Byte]] =
|
||||
byteStringUnmarshaller.map(_.toArray[Byte])
|
||||
|
||||
implicit def charArrayUnmarshaller(implicit fm: ActorFlowMaterializer,
|
||||
implicit def charArrayUnmarshaller(implicit fm: FlowMaterializer,
|
||||
ec: ExecutionContext): FromEntityUnmarshaller[Array[Char]] =
|
||||
byteStringUnmarshaller(fm) mapWithInput { (entity, bytes) ⇒
|
||||
val charBuffer = entity.contentType.charset.nioCharset.decode(bytes.asByteBuffer)
|
||||
|
|
@ -31,17 +31,17 @@ trait PredefinedFromEntityUnmarshallers extends MultipartUnmarshallers {
|
|||
array
|
||||
}
|
||||
|
||||
implicit def stringUnmarshaller(implicit fm: ActorFlowMaterializer,
|
||||
implicit def stringUnmarshaller(implicit fm: FlowMaterializer,
|
||||
ec: ExecutionContext): FromEntityUnmarshaller[String] =
|
||||
byteStringUnmarshaller(fm) mapWithInput { (entity, bytes) ⇒
|
||||
// FIXME: add `ByteString::decodeString(java.nio.Charset): String` overload!!!
|
||||
bytes.decodeString(entity.contentType.charset.nioCharset.name) // ouch!!!
|
||||
}
|
||||
|
||||
implicit def defaultUrlEncodedFormDataUnmarshaller(implicit fm: ActorFlowMaterializer,
|
||||
implicit def defaultUrlEncodedFormDataUnmarshaller(implicit fm: FlowMaterializer,
|
||||
ec: ExecutionContext): FromEntityUnmarshaller[FormData] =
|
||||
urlEncodedFormDataUnmarshaller(MediaTypes.`application/x-www-form-urlencoded`)
|
||||
def urlEncodedFormDataUnmarshaller(ranges: ContentTypeRange*)(implicit fm: ActorFlowMaterializer,
|
||||
def urlEncodedFormDataUnmarshaller(ranges: ContentTypeRange*)(implicit fm: FlowMaterializer,
|
||||
ec: ExecutionContext): FromEntityUnmarshaller[FormData] =
|
||||
stringUnmarshaller.forContentTypes(ranges: _*).mapWithInput { (entity, string) ⇒
|
||||
try {
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue