!str #15551 Switch to implicit passing of FlowMaterializer

* implicit param
* change tests to use implicit materializer
* add ActorFlow trait that provides an implicit materializer inside an actor
  in the right way, i.e. encourage usage of that instead of
  implicit def mat(implicit arf: ActorRef): FlowMaterializer
* make http compile, but those who know the api better will have to adjust
  to take full advantage of the implicit materializer
This commit is contained in:
Patrik Nordwall 2014-08-21 08:38:24 +02:00
parent fa243e656b
commit 1bb5d37780
71 changed files with 447 additions and 375 deletions

View file

@ -61,11 +61,11 @@ private[http] class HttpManager(httpSettings: HttpExt#Settings) extends Actor wi
tcpServerBindingFuture onComplete {
case Success(StreamTcp.TcpServerBinding(localAddress, connectionStream))
log.info("Bound to {}", endpoint)
val materializer = FlowMaterializer(materializerSettings)
implicit val materializer = FlowMaterializer(materializerSettings)
val httpServerPipeline = new HttpServerPipeline(effectiveSettings, materializer, log)
val httpConnectionStream = Flow(connectionStream)
.map(httpServerPipeline)
.toPublisher(materializer)
.toPublisher()
commander ! Http.ServerBinding(localAddress, httpConnectionStream)
case Failure(error)

View file

@ -38,7 +38,7 @@ private[http] class HttpClientPipeline(effectiveSettings: ClientConnectionSettin
val requestMethodByPass = new RequestMethodByPass(tcpConn.remoteAddress)
val (contextBypassSubscriber, contextBypassPublisher) =
Duct[(HttpRequest, Any)].map(_._2).build(materializer)
Duct[(HttpRequest, Any)].map(_._2).build()(materializer)
val requestSubscriber =
Duct[(HttpRequest, Any)]
@ -47,7 +47,7 @@ private[http] class HttpClientPipeline(effectiveSettings: ClientConnectionSettin
.transform(responseRendererFactory.newRenderer)
.flatten(FlattenStrategy.concat)
.transform(errorLogger(log, "Outgoing request stream error"))
.produceTo(tcpConn.outputStream, materializer)
.produceTo(tcpConn.outputStream)(materializer)
val responsePublisher =
Flow(tcpConn.inputStream)
@ -59,7 +59,7 @@ private[http] class HttpClientPipeline(effectiveSettings: ClientConnectionSettin
HttpResponse(statusCode, headers, createEntity(entityParts), protocol)
}
.zip(contextBypassPublisher)
.toPublisher(materializer)
.toPublisher()(materializer)
val processor = HttpClientProcessor(requestSubscriber, responsePublisher)
Http.OutgoingConnection(tcpConn.remoteAddress, tcpConn.localAddress, processor)

View file

@ -59,7 +59,7 @@ sealed trait HttpEntity extends japi.HttpEntity {
throw new java.util.concurrent.TimeoutException(
s"HttpEntity.toStrict timed out after $timeout while still waiting for outstanding data")
})
.toFuture(materializer)
.toFuture()(materializer)
/**
* Creates a copy of this HttpEntity with the `contentType` overridden with the given one.
@ -170,7 +170,7 @@ object HttpEntity {
override def isChunked: Boolean = true
def dataBytes(materializer: FlowMaterializer): Publisher[ByteString] =
Flow(chunks).map(_.data).filter(_.nonEmpty).toPublisher(materializer)
Flow(chunks).map(_.data).filter(_.nonEmpty).toPublisher()(materializer)
def withContentType(contentType: ContentType): Chunked =
if (contentType == this.contentType) this else copy(contentType = contentType)
@ -186,7 +186,7 @@ object HttpEntity {
def apply(contentType: ContentType, chunks: Publisher[ByteString], materializer: FlowMaterializer): Chunked =
Chunked(contentType, Flow(chunks).collect[ChunkStreamPart] {
case b: ByteString if b.nonEmpty Chunk(b)
}.toPublisher(materializer))
}.toPublisher()(materializer))
}
/**

View file

@ -57,7 +57,7 @@ case class MultipartFormData(parts: Publisher[BodyPart]) extends MultipartParts
* hint.
*/
def toStrict(materializer: FlowMaterializer, maxFieldCount: Int = 1000)(implicit ec: ExecutionContext): Future[StrictMultipartFormData] =
Flow(parts).grouped(maxFieldCount).toFuture(materializer).map(new StrictMultipartFormData(_))
Flow(parts).grouped(maxFieldCount).toFuture()(materializer).map(new StrictMultipartFormData(_))
}
/**

View file

@ -145,7 +145,7 @@ private[http] final class BodyPartParser(defaultContentType: ContentType,
emitPartChunk: (List[HttpHeader], ContentType, ByteString) Unit = {
(headers, ct, bytes)
emit(BodyPartStart(headers, entityParts HttpEntity.CloseDelimited(ct,
Flow(entityParts).collect { case EntityPart(data) data }.toPublisher(materializer))))
Flow(entityParts).collect { case EntityPart(data) data }.toPublisher()(materializer))))
emit(bytes)
},
emitFinalPartChunk: (List[HttpHeader], ContentType, ByteString) Unit = {

View file

@ -240,13 +240,13 @@ private[http] abstract class HttpMessageParser[Output >: ParserOutput.MessageOut
def defaultEntity(cth: Option[`Content-Type`], contentLength: Long,
materializer: FlowMaterializer)(entityParts: Publisher[_ <: ParserOutput]): HttpEntity.Regular = {
val data = Flow(entityParts).collect { case ParserOutput.EntityPart(bytes) bytes }.toPublisher(materializer)
val data = Flow(entityParts).collect { case ParserOutput.EntityPart(bytes) bytes }.toPublisher()(materializer)
HttpEntity.Default(contentType(cth), contentLength, data)
}
def chunkedEntity(cth: Option[`Content-Type`],
materializer: FlowMaterializer)(entityChunks: Publisher[_ <: ParserOutput]): HttpEntity.Regular = {
val chunks = Flow(entityChunks).collect { case ParserOutput.EntityChunk(chunk) chunk }.toPublisher(materializer)
val chunks = Flow(entityChunks).collect { case ParserOutput.EntityChunk(chunk) chunk }.toPublisher()(materializer)
HttpEntity.Chunked(contentType(cth), chunks)
}
}

View file

@ -100,7 +100,7 @@ private[http] class HttpResponseParser(_settings: ParserSettings,
}
case None
emitResponseStart { entityParts
val data = Flow(entityParts).collect { case ParserOutput.EntityPart(bytes) bytes }.toPublisher(materializer)
val data = Flow(entityParts).collect { case ParserOutput.EntityPart(bytes) bytes }.toPublisher()(materializer)
HttpEntity.CloseDelimited(contentType(cth), data)
}
parseToCloseBody(input, bodyStart)

View file

@ -62,8 +62,8 @@ private[http] class BodyPartRenderer(boundary: String,
}
def bodyPartChunks(data: Publisher[ByteString]): List[Publisher[ChunkStreamPart]] = {
val entityChunks = Flow(data).map[ChunkStreamPart](Chunk(_)).toPublisher(materializer)
Flow[ChunkStreamPart](Chunk(r.get) :: Nil).concat(entityChunks).toPublisher(materializer) :: Nil
val entityChunks = Flow(data).map[ChunkStreamPart](Chunk(_)).toPublisher()(materializer)
Flow[ChunkStreamPart](Chunk(r.get) :: Nil).concat(entityChunks).toPublisher()(materializer) :: Nil
}
def completePartRendering(): List[Publisher[ChunkStreamPart]] =
@ -73,8 +73,8 @@ private[http] class BodyPartRenderer(boundary: String,
case Default(_, _, data) bodyPartChunks(data)
case CloseDelimited(_, data) bodyPartChunks(data)
case Chunked(_, chunks)
val entityChunks = Flow(chunks).filter(!_.isLastChunk).toPublisher(materializer)
Flow(Chunk(r.get) :: Nil).concat(entityChunks).toPublisher(materializer) :: Nil
val entityChunks = Flow(chunks).filter(!_.isLastChunk).toPublisher()(materializer)
Flow(Chunk(r.get) :: Nil).concat(entityChunks).toPublisher()(materializer) :: Nil
}
renderBoundary()

View file

@ -106,12 +106,12 @@ private[http] class HttpRequestRendererFactory(userAgentHeader: Option[headers.`
case HttpEntity.Default(_, contentLength, data)
renderContentLength(contentLength)
renderByteStrings(r,
Flow(data).transform(new CheckContentLengthTransformer(contentLength)).toPublisher(materializer),
Flow(data).transform(new CheckContentLengthTransformer(contentLength)).toPublisher()(materializer),
materializer)
case HttpEntity.Chunked(_, chunks)
r ~~ `Transfer-Encoding` ~~ ChunkedBytes ~~ CrLf ~~ CrLf
renderByteStrings(r, Flow(chunks).transform(new ChunkTransformer).toPublisher(materializer), materializer)
renderByteStrings(r, Flow(chunks).transform(new ChunkTransformer).toPublisher()(materializer), materializer)
}
renderRequestLine()

View file

@ -132,7 +132,7 @@ private[http] class HttpResponseRendererFactory(serverHeader: Option[headers.Ser
renderHeaders(headers.toList)
renderEntityContentType(r, entity)
r ~~ `Content-Length` ~~ contentLength ~~ CrLf ~~ CrLf
byteStrings(Flow(data).transform(new CheckContentLengthTransformer(contentLength)).toPublisher(materializer))
byteStrings(Flow(data).transform(new CheckContentLengthTransformer(contentLength)).toPublisher()(materializer))
case HttpEntity.CloseDelimited(_, data)
renderHeaders(headers.toList, alwaysClose = true)
@ -142,14 +142,14 @@ private[http] class HttpResponseRendererFactory(serverHeader: Option[headers.Ser
case HttpEntity.Chunked(contentType, chunks)
if (ctx.requestProtocol == `HTTP/1.0`)
completeResponseRendering(HttpEntity.CloseDelimited(contentType, Flow(chunks).map(_.data).toPublisher(materializer)))
completeResponseRendering(HttpEntity.CloseDelimited(contentType, Flow(chunks).map(_.data).toPublisher()(materializer)))
else {
renderHeaders(headers.toList)
renderEntityContentType(r, entity)
if (!entity.isKnownEmpty || ctx.requestMethod == HttpMethods.HEAD)
r ~~ `Transfer-Encoding` ~~ ChunkedBytes ~~ CrLf
r ~~ CrLf
byteStrings(Flow(chunks).transform(new ChunkTransformer).toPublisher(materializer))
byteStrings(Flow(chunks).transform(new ChunkTransformer).toPublisher()(materializer))
}
}

View file

@ -38,7 +38,7 @@ private object RenderSupport {
skipEntity: Boolean = false): List[Publisher[ByteString]] = {
val messageStart = SynchronousPublisherFromIterable(r.get :: Nil)
val messageBytes =
if (!skipEntity) Flow(messageStart).concat(entityBytes).toPublisher(materializer)
if (!skipEntity) Flow(messageStart).concat(entityBytes).toPublisher()(materializer)
else messageStart
messageBytes :: Nil
}

View file

@ -36,7 +36,7 @@ private[http] class HttpServerPipeline(settings: ServerSettings,
val (applicationBypassSubscriber, applicationBypassPublisher) =
Duct[(RequestOutput, Publisher[RequestOutput])]
.collect[MessageStart with RequestOutput] { case (x: MessageStart, _) x }
.build(materializer)
.build()(materializer)
val requestPublisher =
Flow(tcpConn.inputStream)
@ -53,7 +53,7 @@ private[http] class HttpServerPipeline(settings: ServerSettings,
val effectiveUri = HttpRequest.effectiveUri(uri, headers, securedConnection = false, settings.defaultHostHeader)
HttpRequest(method, effectiveUri, headers, createEntity(entityParts), protocol)
}
.toPublisher(materializer)
.toPublisher()(materializer)
val responseSubscriber =
Duct[HttpResponse]
@ -62,7 +62,7 @@ private[http] class HttpServerPipeline(settings: ServerSettings,
.transform(responseRendererFactory.newRenderer)
.flatten(FlattenStrategy.concat)
.transform(errorLogger(log, "Outgoing response stream error"))
.produceTo(tcpConn.outputStream, materializer)
.produceTo(tcpConn.outputStream)(materializer)
Http.IncomingConnection(tcpConn.remoteAddress, requestPublisher, responseSubscriber)
}

View file

@ -31,7 +31,7 @@ package object util {
private[http] implicit class FlowWithHeadAndTail[T](val underlying: Flow[Publisher[T]]) extends AnyVal {
def headAndTail(materializer: FlowMaterializer): Flow[(T, Publisher[T])] =
underlying.map { p
Flow(p).prefixAndTail(1).map { case (prefix, tail) (prefix.head, tail) }.toPublisher(materializer)
Flow(p).prefixAndTail(1).map { case (prefix, tail) (prefix.head, tail) }.toPublisher()(materializer)
}.flatten(FlattenStrategy.Concat())
}

View file

@ -93,7 +93,7 @@ class ClientServerSpec extends WordSpec with Matchers with BeforeAndAfterAll {
private val HttpRequest(POST, uri, List(`User-Agent`(_), Host(_, _), Accept(Vector(MediaRanges.`*/*`))),
Chunked(`chunkedContentType`, chunkStream), HttpProtocols.`HTTP/1.1`) = serverIn.expectNext()
uri shouldEqual Uri(s"http://$hostname:$port/chunked")
Await.result(Flow(chunkStream).grouped(4).toFuture(materializer), 100.millis) shouldEqual chunks
Await.result(Flow(chunkStream).grouped(4).toFuture()(materializer), 100.millis) shouldEqual chunks
val serverOutSub = serverOut.expectSubscription()
serverOutSub.sendNext(HttpResponse(206, List(RawHeader("Age", "42")), chunkedEntity))
@ -102,7 +102,7 @@ class ClientServerSpec extends WordSpec with Matchers with BeforeAndAfterAll {
clientInSub.request(1)
val (HttpResponse(StatusCodes.PartialContent, List(Date(_), Server(_), RawHeader("Age", "42")),
Chunked(`chunkedContentType`, chunkStream2), HttpProtocols.`HTTP/1.1`), 12345678) = clientIn.expectNext()
Await.result(Flow(chunkStream2).grouped(1000).toFuture(materializer), 100.millis) shouldEqual chunks
Await.result(Flow(chunkStream2).grouped(1000).toFuture()(materializer), 100.millis) shouldEqual chunks
}
}

View file

@ -25,7 +25,7 @@ object TestClient extends App {
implicit val system = ActorSystem("ServerTest", testConf)
import system.dispatcher
val materializer = FlowMaterializer(MaterializerSettings())
implicit val materializer = FlowMaterializer(MaterializerSettings())
implicit val askTimeout: Timeout = 500.millis
val host = "spray.io"
@ -37,8 +37,8 @@ object TestClient extends App {
} yield response.header[headers.Server]
def sendRequest(request: HttpRequest, connection: Http.OutgoingConnection): Future[HttpResponse] = {
Flow(List(HttpRequest() -> 'NoContext)).produceTo(connection.processor, materializer)
Flow(connection.processor).map(_._1).toFuture(materializer)
Flow(List(HttpRequest() -> 'NoContext)).produceTo(connection.processor)
Flow(connection.processor).map(_._1).toFuture()
}
result onComplete {

View file

@ -30,17 +30,17 @@ object TestServer extends App {
case _: HttpRequest HttpResponse(404, entity = "Unknown resource!")
}
val materializer = FlowMaterializer(MaterializerSettings())
implicit val materializer = FlowMaterializer(MaterializerSettings())
implicit val askTimeout: Timeout = 500.millis
val bindingFuture = IO(Http) ? Http.Bind(interface = "localhost", port = 8080)
bindingFuture foreach {
case Http.ServerBinding(localAddress, connectionStream)
Flow(connectionStream).foreach({
Flow(connectionStream).foreach {
case Http.IncomingConnection(remoteAddress, requestPublisher, responseSubscriber)
println("Accepted new connection from " + remoteAddress)
Flow(requestPublisher).map(requestHandler).produceTo(responseSubscriber, materializer)
}, materializer)
Flow(requestPublisher).map(requestHandler).produceTo(responseSubscriber)
}
}
println(s"Server online at http://localhost:8080/\nPress RETURN to stop...")

View file

@ -78,7 +78,7 @@ class HttpEntitySpec extends FreeSpec with MustMatchers with BeforeAndAfterAll {
}
"Infinite data stream" in {
val neverCompleted = Promise[ByteString]()
val stream: Publisher[ByteString] = Flow(neverCompleted.future).toPublisher(materializer)
val stream: Publisher[ByteString] = Flow(neverCompleted.future).toPublisher()(materializer)
intercept[TimeoutException] {
Await.result(Default(tpe, 42, stream).toStrict(100.millis, materializer), 150.millis)
}.getMessage must be("HttpEntity.toStrict timed out after 100 milliseconds while still waiting for outstanding data")
@ -92,7 +92,7 @@ class HttpEntitySpec extends FreeSpec with MustMatchers with BeforeAndAfterAll {
equal(bytes.toVector).matcher[Seq[ByteString]].compose { entity
val future =
Flow(entity.dataBytes(materializer))
.grouped(1000).toFuture(materializer)
.grouped(1000).toFuture()(materializer)
Await.result(future, 250.millis)
}

View file

@ -369,10 +369,10 @@ class RequestParserSpec extends FreeSpec with Matchers with BeforeAndAfterAll {
case Right(request) compactEntity(request.entity).map(x Right(request.withEntity(x)))
case Left(error) Future.successful(Left(error))
}
}.toPublisher(materializer)
}.toPublisher()(materializer)
}
.flatten(FlattenStrategy.concat)
.grouped(1000).toFuture(materializer)
.grouped(1000).toFuture()(materializer)
Await.result(future, 250.millis)
}
@ -385,7 +385,7 @@ class RequestParserSpec extends FreeSpec with Matchers with BeforeAndAfterAll {
}
private def compactEntityChunks(data: Publisher[ChunkStreamPart]): Future[Publisher[ChunkStreamPart]] =
Flow(data).grouped(1000).toFuture(materializer)
Flow(data).grouped(1000).toFuture()(materializer)
.map(publisher(_: _*))
.recover { case _: NoSuchElementException publisher[ChunkStreamPart]() }

View file

@ -227,10 +227,10 @@ class ResponseParserSpec extends FreeSpec with Matchers with BeforeAndAfterAll {
case Right(response) compactEntity(response.entity).map(x Right(response.withEntity(x)))
case Left(error) Future.successful(Left(error.info.formatPretty))
}
}.toPublisher(materializer)
}.toPublisher()(materializer)
}
.flatten(FlattenStrategy.concat)
.grouped(1000).toFuture(materializer)
.grouped(1000).toFuture()(materializer)
Await.result(future, 250.millis)
}
@ -247,7 +247,7 @@ class ResponseParserSpec extends FreeSpec with Matchers with BeforeAndAfterAll {
}
private def compactEntityChunks(data: Publisher[ChunkStreamPart]): Future[Publisher[ChunkStreamPart]] =
Flow(data).grouped(1000).toFuture(materializer)
Flow(data).grouped(1000).toFuture()(materializer)
.map(publisher(_: _*))
.recover {
case _: NoSuchElementException publisher[ChunkStreamPart]()

View file

@ -190,7 +190,7 @@ class RequestRendererSpec extends FreeSpec with Matchers with BeforeAndAfterAll
equal(expected.stripMarginWithNewline("\r\n")).matcher[String] compose { request
val renderer = newRenderer
val byteStringPublisher :: Nil = renderer.onNext(RequestRenderingContext(request, serverAddress))
val future = Flow(byteStringPublisher).grouped(1000).toFuture(materializer).map(_.reduceLeft(_ ++ _).utf8String)
val future = Flow(byteStringPublisher).grouped(1000).toFuture()(materializer).map(_.reduceLeft(_ ++ _).utf8String)
Await.result(future, 250.millis)
}
}

View file

@ -337,7 +337,7 @@ class ResponseRendererSpec extends FreeSpec with Matchers with BeforeAndAfterAll
equal(expected.stripMarginWithNewline("\r\n") -> close).matcher[(String, Boolean)] compose { ctx
val renderer = newRenderer
val byteStringPublisher :: Nil = renderer.onNext(ctx)
val future = Flow(byteStringPublisher).grouped(1000).toFuture(materializer).map(_.reduceLeft(_ ++ _).utf8String)
val future = Flow(byteStringPublisher).grouped(1000).toFuture()(materializer).map(_.reduceLeft(_ ++ _).utf8String)
Await.result(future, 250.millis) -> renderer.isComplete
}