=hco,htp Upgrade to latest release-2.3-dev head
This commit is contained in:
parent
991293b25f
commit
1c3bd3817e
7 changed files with 25 additions and 25 deletions
|
|
@ -181,7 +181,7 @@ object HttpEntity {
|
|||
*/
|
||||
def apply(contentType: ContentType, chunks: Publisher[ByteString], materializer: FlowMaterializer): Chunked =
|
||||
Chunked(contentType, Flow(chunks).collect[ChunkStreamPart] {
|
||||
case b: ByteString if b.nonEmpty => Chunk(b)
|
||||
case b: ByteString if b.nonEmpty ⇒ Chunk(b)
|
||||
}.toPublisher(materializer))
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -46,14 +46,14 @@ object MultipartByteRanges {
|
|||
* All parts must contain a Content-Disposition header with a type form-data
|
||||
* and a name parameter that is unique.
|
||||
*/
|
||||
final case class MultipartFormData(parts: Producer[BodyPart]) extends MultipartParts {
|
||||
final case class MultipartFormData(parts: Publisher[BodyPart]) extends MultipartParts {
|
||||
// def get(partName: String): Option[BodyPart] = fields.find(_.name.exists(_ == partName))
|
||||
}
|
||||
|
||||
object MultipartFormData {
|
||||
val Empty = MultipartFormData()
|
||||
|
||||
def apply(parts: BodyPart*): MultipartFormData = apply(SynchronousProducerFromIterable[BodyPart](parts.toList))
|
||||
def apply(parts: BodyPart*): MultipartFormData = apply(SynchronousPublisherFromIterable[BodyPart](parts.toList))
|
||||
|
||||
def apply(fields: Map[String, BodyPart]): MultipartFormData = apply {
|
||||
fields.map {
|
||||
|
|
|
|||
|
|
@ -5,7 +5,7 @@
|
|||
package akka.http.parsing
|
||||
|
||||
import akka.event.LoggingAdapter
|
||||
import org.reactivestreams.api.Producer
|
||||
import org.reactivestreams.Publisher
|
||||
import scala.annotation.tailrec
|
||||
import scala.collection.immutable
|
||||
import scala.collection.mutable.ListBuffer
|
||||
|
|
@ -141,7 +141,7 @@ private[http] final class BodyPartParser(defaultContentType: ContentType,
|
|||
emitPartChunk: (List[HttpHeader], ContentType, ByteString) ⇒ Unit = {
|
||||
(headers, ct, bytes) ⇒
|
||||
emit(BodyPartStart(headers, entityParts ⇒ HttpEntity.CloseDelimited(ct,
|
||||
Flow(entityParts).collect { case EntityPart(data) ⇒ data }.toProducer(materializer))))
|
||||
Flow(entityParts).collect { case EntityPart(data) ⇒ data }.toPublisher(materializer))))
|
||||
emit(bytes)
|
||||
},
|
||||
emitFinalPartChunk: (List[HttpHeader], ContentType, ByteString) ⇒ Unit = {
|
||||
|
|
@ -214,7 +214,7 @@ private[http] object BodyPartParser {
|
|||
val boundaryCharNoSpace = CharPredicate.Digit ++ CharPredicate.Alpha ++ "'()+_,-./:=?"
|
||||
|
||||
sealed trait Output
|
||||
final case class BodyPartStart(headers: List[HttpHeader], createEntity: Producer[Output] ⇒ HttpEntity) extends Output
|
||||
final case class BodyPartStart(headers: List[HttpHeader], createEntity: Publisher[Output] ⇒ HttpEntity) extends Output
|
||||
final case class EntityPart(data: ByteString) extends Output
|
||||
final case class ParseError(info: ErrorInfo) extends Output
|
||||
|
||||
|
|
|
|||
|
|
@ -5,14 +5,14 @@
|
|||
package akka.http.rendering
|
||||
|
||||
import java.nio.charset.Charset
|
||||
import org.reactivestreams.api.Producer
|
||||
import org.reactivestreams.Publisher
|
||||
import scala.annotation.tailrec
|
||||
import akka.event.LoggingAdapter
|
||||
import akka.http.model._
|
||||
import akka.http.model.headers._
|
||||
import akka.http.rendering.RenderSupport._
|
||||
import akka.http.util._
|
||||
import akka.stream.impl.SynchronousProducerFromIterable
|
||||
import akka.stream.impl.SynchronousPublisherFromIterable
|
||||
import akka.stream.scaladsl.Flow
|
||||
import akka.stream.{ FlowMaterializer, Transformer }
|
||||
import akka.util.ByteString
|
||||
|
|
@ -25,11 +25,11 @@ private[http] class BodyPartRenderer(boundary: String,
|
|||
nioCharset: Charset,
|
||||
partHeadersSizeHint: Int,
|
||||
materializer: FlowMaterializer,
|
||||
log: LoggingAdapter) extends Transformer[BodyPart, Producer[ChunkStreamPart]] {
|
||||
log: LoggingAdapter) extends Transformer[BodyPart, Publisher[ChunkStreamPart]] {
|
||||
|
||||
private[this] var firstBoundaryRendered = false
|
||||
|
||||
def onNext(bodyPart: BodyPart): List[Producer[ChunkStreamPart]] = {
|
||||
def onNext(bodyPart: BodyPart): List[Publisher[ChunkStreamPart]] = {
|
||||
val r = new CustomCharsetByteStringRendering(nioCharset, partHeadersSizeHint)
|
||||
|
||||
def renderBoundary(): Unit = {
|
||||
|
|
@ -61,20 +61,20 @@ private[http] class BodyPartRenderer(boundary: String,
|
|||
case Nil ⇒ r ~~ CrLf
|
||||
}
|
||||
|
||||
def bodyPartChunks(data: Producer[ByteString]): List[Producer[ChunkStreamPart]] = {
|
||||
val entityChunks = Flow(data).map[ChunkStreamPart](Chunk(_)).toProducer(materializer)
|
||||
Flow[ChunkStreamPart](Chunk(r.get) :: Nil).concat(entityChunks).toProducer(materializer) :: Nil
|
||||
def bodyPartChunks(data: Publisher[ByteString]): List[Publisher[ChunkStreamPart]] = {
|
||||
val entityChunks = Flow(data).map[ChunkStreamPart](Chunk(_)).toPublisher(materializer)
|
||||
Flow[ChunkStreamPart](Chunk(r.get) :: Nil).concat(entityChunks).toPublisher(materializer) :: Nil
|
||||
}
|
||||
|
||||
def completePartRendering(): List[Producer[ChunkStreamPart]] =
|
||||
def completePartRendering(): List[Publisher[ChunkStreamPart]] =
|
||||
bodyPart.entity match {
|
||||
case x if x.isKnownEmpty ⇒ chunkStream(r.get)
|
||||
case Strict(_, data) ⇒ chunkStream((r ~~ data).get)
|
||||
case Default(_, _, data) ⇒ bodyPartChunks(data)
|
||||
case CloseDelimited(_, data) ⇒ bodyPartChunks(data)
|
||||
case Chunked(_, chunks) ⇒
|
||||
val entityChunks = Flow(chunks).filter(!_.isLastChunk).toProducer(materializer)
|
||||
Flow(Chunk(r.get) :: Nil).concat(entityChunks).toProducer(materializer) :: Nil
|
||||
val entityChunks = Flow(chunks).filter(!_.isLastChunk).toPublisher(materializer)
|
||||
Flow(Chunk(r.get) :: Nil).concat(entityChunks).toPublisher(materializer) :: Nil
|
||||
}
|
||||
|
||||
renderBoundary()
|
||||
|
|
@ -84,7 +84,7 @@ private[http] class BodyPartRenderer(boundary: String,
|
|||
completePartRendering()
|
||||
}
|
||||
|
||||
override def onTermination(e: Option[Throwable]): List[Producer[ChunkStreamPart]] =
|
||||
override def onTermination(e: Option[Throwable]): List[Publisher[ChunkStreamPart]] =
|
||||
if (e.isEmpty && firstBoundaryRendered) {
|
||||
val r = new ByteStringRendering(boundary.length + 4)
|
||||
r ~~ CrLf ~~ '-' ~~ '-' ~~ boundary ~~ '-' ~~ '-'
|
||||
|
|
@ -92,6 +92,6 @@ private[http] class BodyPartRenderer(boundary: String,
|
|||
} else Nil
|
||||
|
||||
private def chunkStream(byteString: ByteString) =
|
||||
SynchronousProducerFromIterable[ChunkStreamPart](Chunk(byteString) :: Nil) :: Nil
|
||||
SynchronousPublisherFromIterable[ChunkStreamPart](Chunk(byteString) :: Nil) :: Nil
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -6,7 +6,7 @@ package akka.http.util
|
|||
|
||||
import java.nio.CharBuffer
|
||||
import java.nio.charset.Charset
|
||||
import java.text.{DecimalFormatSymbols, DecimalFormat}
|
||||
import java.text.{ DecimalFormatSymbols, DecimalFormat }
|
||||
import java.util.Locale
|
||||
import scala.annotation.tailrec
|
||||
import scala.collection.{ immutable, LinearSeq }
|
||||
|
|
|
|||
|
|
@ -40,7 +40,7 @@ trait MultipartMarshallers {
|
|||
Marshaller.withOpenCharset(mediaTypeWithBoundary) { (value, charset) ⇒
|
||||
val log = actorSystem(refFactory).log
|
||||
val bodyPartRenderer = new BodyPartRenderer(boundary, charset.nioCharset, partHeadersSizeHint = 128, fm, log)
|
||||
val chunks = Flow(value.parts).transform(bodyPartRenderer).flatten(FlattenStrategy.concat).toProducer(fm)
|
||||
val chunks = Flow(value.parts).transform(bodyPartRenderer).flatten(FlattenStrategy.concat).toPublisher(fm)
|
||||
HttpEntity.Chunked(ContentType(mediaTypeWithBoundary), chunks)
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -4,7 +4,7 @@
|
|||
|
||||
package akka.http.unmarshalling
|
||||
|
||||
import org.reactivestreams.api.Producer
|
||||
import org.reactivestreams.Publisher
|
||||
import scala.concurrent.Future
|
||||
import akka.actor.ActorRefFactory
|
||||
import akka.http.parsing.BodyPartParser
|
||||
|
|
@ -31,7 +31,7 @@ trait MultipartUnmarshallers {
|
|||
multipartPartsUnmarshaller[MultipartByteRanges](`multipart/byteranges`,
|
||||
ContentTypes.`text/plain` withCharset defaultCharset)(MultipartByteRanges(_))
|
||||
|
||||
def multipartPartsUnmarshaller[T <: MultipartParts](mediaRange: MediaRange, defaultContentType: ContentType)(create: Producer[BodyPart] ⇒ T)(implicit fm: FlowMaterializer,
|
||||
def multipartPartsUnmarshaller[T <: MultipartParts](mediaRange: MediaRange, defaultContentType: ContentType)(create: Publisher[BodyPart] ⇒ T)(implicit fm: FlowMaterializer,
|
||||
refFactory: ActorRefFactory): FromEntityUnmarshaller[T] =
|
||||
Unmarshaller { entity ⇒
|
||||
Future.successful {
|
||||
|
|
@ -46,7 +46,7 @@ trait MultipartUnmarshallers {
|
|||
.collect {
|
||||
case (BodyPartParser.BodyPartStart(headers, createEntity), entityParts) ⇒
|
||||
BodyPart(createEntity(entityParts), headers)
|
||||
}.toProducer(fm)
|
||||
}.toPublisher(fm)
|
||||
Unmarshalling.Success(create(bodyParts))
|
||||
}
|
||||
} else Unmarshalling.UnsupportedContentType(ContentTypeRange(mediaRange) :: Nil)
|
||||
|
|
@ -60,7 +60,7 @@ trait MultipartUnmarshallers {
|
|||
refFactory: ActorRefFactory): FromEntityUnmarshaller[MultipartFormData] =
|
||||
multipartPartsUnmarshaller(`multipart/form-data`, ContentTypes.`application/octet-stream`) { bodyParts ⇒
|
||||
def verify(part: BodyPart): BodyPart = part // TODO
|
||||
val parts = if (strict) Flow(bodyParts).map(verify).toProducer(fm) else bodyParts
|
||||
val parts = if (strict) Flow(bodyParts).map(verify).toPublisher(fm) else bodyParts
|
||||
MultipartFormData(parts)
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue