diff --git a/akka-http-core/src/main/scala/akka/http/model/HttpEntity.scala b/akka-http-core/src/main/scala/akka/http/model/HttpEntity.scala index 392bfeb26c..570eef4375 100644 --- a/akka-http-core/src/main/scala/akka/http/model/HttpEntity.scala +++ b/akka-http-core/src/main/scala/akka/http/model/HttpEntity.scala @@ -181,7 +181,7 @@ object HttpEntity { */ def apply(contentType: ContentType, chunks: Publisher[ByteString], materializer: FlowMaterializer): Chunked = Chunked(contentType, Flow(chunks).collect[ChunkStreamPart] { - case b: ByteString if b.nonEmpty => Chunk(b) + case b: ByteString if b.nonEmpty ⇒ Chunk(b) }.toPublisher(materializer)) } diff --git a/akka-http-core/src/main/scala/akka/http/model/MultipartContent.scala b/akka-http-core/src/main/scala/akka/http/model/MultipartContent.scala index 4b804d2750..8a51cbc92f 100644 --- a/akka-http-core/src/main/scala/akka/http/model/MultipartContent.scala +++ b/akka-http-core/src/main/scala/akka/http/model/MultipartContent.scala @@ -46,14 +46,14 @@ object MultipartByteRanges { * All parts must contain a Content-Disposition header with a type form-data * and a name parameter that is unique. */ -final case class MultipartFormData(parts: Producer[BodyPart]) extends MultipartParts { +final case class MultipartFormData(parts: Publisher[BodyPart]) extends MultipartParts { // def get(partName: String): Option[BodyPart] = fields.find(_.name.exists(_ == partName)) } object MultipartFormData { val Empty = MultipartFormData() - def apply(parts: BodyPart*): MultipartFormData = apply(SynchronousProducerFromIterable[BodyPart](parts.toList)) + def apply(parts: BodyPart*): MultipartFormData = apply(SynchronousPublisherFromIterable[BodyPart](parts.toList)) def apply(fields: Map[String, BodyPart]): MultipartFormData = apply { fields.map { diff --git a/akka-http-core/src/main/scala/akka/http/parsing/BodyPartParser.scala b/akka-http-core/src/main/scala/akka/http/parsing/BodyPartParser.scala index a64c61a67d..2a1a45f135 100644 --- a/akka-http-core/src/main/scala/akka/http/parsing/BodyPartParser.scala +++ b/akka-http-core/src/main/scala/akka/http/parsing/BodyPartParser.scala @@ -5,7 +5,7 @@ package akka.http.parsing import akka.event.LoggingAdapter -import org.reactivestreams.api.Producer +import org.reactivestreams.Publisher import scala.annotation.tailrec import scala.collection.immutable import scala.collection.mutable.ListBuffer @@ -141,7 +141,7 @@ private[http] final class BodyPartParser(defaultContentType: ContentType, emitPartChunk: (List[HttpHeader], ContentType, ByteString) ⇒ Unit = { (headers, ct, bytes) ⇒ emit(BodyPartStart(headers, entityParts ⇒ HttpEntity.CloseDelimited(ct, - Flow(entityParts).collect { case EntityPart(data) ⇒ data }.toProducer(materializer)))) + Flow(entityParts).collect { case EntityPart(data) ⇒ data }.toPublisher(materializer)))) emit(bytes) }, emitFinalPartChunk: (List[HttpHeader], ContentType, ByteString) ⇒ Unit = { @@ -214,7 +214,7 @@ private[http] object BodyPartParser { val boundaryCharNoSpace = CharPredicate.Digit ++ CharPredicate.Alpha ++ "'()+_,-./:=?" sealed trait Output - final case class BodyPartStart(headers: List[HttpHeader], createEntity: Producer[Output] ⇒ HttpEntity) extends Output + final case class BodyPartStart(headers: List[HttpHeader], createEntity: Publisher[Output] ⇒ HttpEntity) extends Output final case class EntityPart(data: ByteString) extends Output final case class ParseError(info: ErrorInfo) extends Output diff --git a/akka-http-core/src/main/scala/akka/http/rendering/BodyPartRenderer.scala b/akka-http-core/src/main/scala/akka/http/rendering/BodyPartRenderer.scala index d5a71c2af2..1a5b7162eb 100644 --- a/akka-http-core/src/main/scala/akka/http/rendering/BodyPartRenderer.scala +++ b/akka-http-core/src/main/scala/akka/http/rendering/BodyPartRenderer.scala @@ -5,14 +5,14 @@ package akka.http.rendering import java.nio.charset.Charset -import org.reactivestreams.api.Producer +import org.reactivestreams.Publisher import scala.annotation.tailrec import akka.event.LoggingAdapter import akka.http.model._ import akka.http.model.headers._ import akka.http.rendering.RenderSupport._ import akka.http.util._ -import akka.stream.impl.SynchronousProducerFromIterable +import akka.stream.impl.SynchronousPublisherFromIterable import akka.stream.scaladsl.Flow import akka.stream.{ FlowMaterializer, Transformer } import akka.util.ByteString @@ -25,11 +25,11 @@ private[http] class BodyPartRenderer(boundary: String, nioCharset: Charset, partHeadersSizeHint: Int, materializer: FlowMaterializer, - log: LoggingAdapter) extends Transformer[BodyPart, Producer[ChunkStreamPart]] { + log: LoggingAdapter) extends Transformer[BodyPart, Publisher[ChunkStreamPart]] { private[this] var firstBoundaryRendered = false - def onNext(bodyPart: BodyPart): List[Producer[ChunkStreamPart]] = { + def onNext(bodyPart: BodyPart): List[Publisher[ChunkStreamPart]] = { val r = new CustomCharsetByteStringRendering(nioCharset, partHeadersSizeHint) def renderBoundary(): Unit = { @@ -61,20 +61,20 @@ private[http] class BodyPartRenderer(boundary: String, case Nil ⇒ r ~~ CrLf } - def bodyPartChunks(data: Producer[ByteString]): List[Producer[ChunkStreamPart]] = { - val entityChunks = Flow(data).map[ChunkStreamPart](Chunk(_)).toProducer(materializer) - Flow[ChunkStreamPart](Chunk(r.get) :: Nil).concat(entityChunks).toProducer(materializer) :: Nil + def bodyPartChunks(data: Publisher[ByteString]): List[Publisher[ChunkStreamPart]] = { + val entityChunks = Flow(data).map[ChunkStreamPart](Chunk(_)).toPublisher(materializer) + Flow[ChunkStreamPart](Chunk(r.get) :: Nil).concat(entityChunks).toPublisher(materializer) :: Nil } - def completePartRendering(): List[Producer[ChunkStreamPart]] = + def completePartRendering(): List[Publisher[ChunkStreamPart]] = bodyPart.entity match { case x if x.isKnownEmpty ⇒ chunkStream(r.get) case Strict(_, data) ⇒ chunkStream((r ~~ data).get) case Default(_, _, data) ⇒ bodyPartChunks(data) case CloseDelimited(_, data) ⇒ bodyPartChunks(data) case Chunked(_, chunks) ⇒ - val entityChunks = Flow(chunks).filter(!_.isLastChunk).toProducer(materializer) - Flow(Chunk(r.get) :: Nil).concat(entityChunks).toProducer(materializer) :: Nil + val entityChunks = Flow(chunks).filter(!_.isLastChunk).toPublisher(materializer) + Flow(Chunk(r.get) :: Nil).concat(entityChunks).toPublisher(materializer) :: Nil } renderBoundary() @@ -84,7 +84,7 @@ private[http] class BodyPartRenderer(boundary: String, completePartRendering() } - override def onTermination(e: Option[Throwable]): List[Producer[ChunkStreamPart]] = + override def onTermination(e: Option[Throwable]): List[Publisher[ChunkStreamPart]] = if (e.isEmpty && firstBoundaryRendered) { val r = new ByteStringRendering(boundary.length + 4) r ~~ CrLf ~~ '-' ~~ '-' ~~ boundary ~~ '-' ~~ '-' @@ -92,6 +92,6 @@ private[http] class BodyPartRenderer(boundary: String, } else Nil private def chunkStream(byteString: ByteString) = - SynchronousProducerFromIterable[ChunkStreamPart](Chunk(byteString) :: Nil) :: Nil + SynchronousPublisherFromIterable[ChunkStreamPart](Chunk(byteString) :: Nil) :: Nil } diff --git a/akka-http-core/src/main/scala/akka/http/util/Rendering.scala b/akka-http-core/src/main/scala/akka/http/util/Rendering.scala index c34191aefa..9e1333e78e 100644 --- a/akka-http-core/src/main/scala/akka/http/util/Rendering.scala +++ b/akka-http-core/src/main/scala/akka/http/util/Rendering.scala @@ -6,7 +6,7 @@ package akka.http.util import java.nio.CharBuffer import java.nio.charset.Charset -import java.text.{DecimalFormatSymbols, DecimalFormat} +import java.text.{ DecimalFormatSymbols, DecimalFormat } import java.util.Locale import scala.annotation.tailrec import scala.collection.{ immutable, LinearSeq } diff --git a/akka-http/src/main/scala/akka/http/marshalling/MultipartMarshallers.scala b/akka-http/src/main/scala/akka/http/marshalling/MultipartMarshallers.scala index e763354ebe..6f84d3104b 100644 --- a/akka-http/src/main/scala/akka/http/marshalling/MultipartMarshallers.scala +++ b/akka-http/src/main/scala/akka/http/marshalling/MultipartMarshallers.scala @@ -40,7 +40,7 @@ trait MultipartMarshallers { Marshaller.withOpenCharset(mediaTypeWithBoundary) { (value, charset) ⇒ val log = actorSystem(refFactory).log val bodyPartRenderer = new BodyPartRenderer(boundary, charset.nioCharset, partHeadersSizeHint = 128, fm, log) - val chunks = Flow(value.parts).transform(bodyPartRenderer).flatten(FlattenStrategy.concat).toProducer(fm) + val chunks = Flow(value.parts).transform(bodyPartRenderer).flatten(FlattenStrategy.concat).toPublisher(fm) HttpEntity.Chunked(ContentType(mediaTypeWithBoundary), chunks) } } diff --git a/akka-http/src/main/scala/akka/http/unmarshalling/MultipartUnmarshallers.scala b/akka-http/src/main/scala/akka/http/unmarshalling/MultipartUnmarshallers.scala index 860579e4c0..281661caa2 100644 --- a/akka-http/src/main/scala/akka/http/unmarshalling/MultipartUnmarshallers.scala +++ b/akka-http/src/main/scala/akka/http/unmarshalling/MultipartUnmarshallers.scala @@ -4,7 +4,7 @@ package akka.http.unmarshalling -import org.reactivestreams.api.Producer +import org.reactivestreams.Publisher import scala.concurrent.Future import akka.actor.ActorRefFactory import akka.http.parsing.BodyPartParser @@ -31,8 +31,8 @@ trait MultipartUnmarshallers { multipartPartsUnmarshaller[MultipartByteRanges](`multipart/byteranges`, ContentTypes.`text/plain` withCharset defaultCharset)(MultipartByteRanges(_)) - def multipartPartsUnmarshaller[T <: MultipartParts](mediaRange: MediaRange, defaultContentType: ContentType)(create: Producer[BodyPart] ⇒ T)(implicit fm: FlowMaterializer, - refFactory: ActorRefFactory): FromEntityUnmarshaller[T] = + def multipartPartsUnmarshaller[T <: MultipartParts](mediaRange: MediaRange, defaultContentType: ContentType)(create: Publisher[BodyPart] ⇒ T)(implicit fm: FlowMaterializer, + refFactory: ActorRefFactory): FromEntityUnmarshaller[T] = Unmarshaller { entity ⇒ Future.successful { if (mediaRange matches entity.contentType.mediaType) { @@ -46,7 +46,7 @@ trait MultipartUnmarshallers { .collect { case (BodyPartParser.BodyPartStart(headers, createEntity), entityParts) ⇒ BodyPart(createEntity(entityParts), headers) - }.toProducer(fm) + }.toPublisher(fm) Unmarshalling.Success(create(bodyParts)) } } else Unmarshalling.UnsupportedContentType(ContentTypeRange(mediaRange) :: Nil) @@ -60,7 +60,7 @@ trait MultipartUnmarshallers { refFactory: ActorRefFactory): FromEntityUnmarshaller[MultipartFormData] = multipartPartsUnmarshaller(`multipart/form-data`, ContentTypes.`application/octet-stream`) { bodyParts ⇒ def verify(part: BodyPart): BodyPart = part // TODO - val parts = if (strict) Flow(bodyParts).map(verify).toProducer(fm) else bodyParts + val parts = if (strict) Flow(bodyParts).map(verify).toPublisher(fm) else bodyParts MultipartFormData(parts) } }