!str #19005 make groupBy et al return a SubFlow
A SubFlow (or SubSource) is not a Graph, it is an unfinished builder that accepts transformations. This allows us to capture the substreams’ transformations before materializing the flow, which will be very helpful in fully fusing all operators. Another change is that groupBy now requires a maxSubstreams parameter in order to bound its resource usage. In exchange the matching merge can be unbounded. This trades silent deadlock for explicit stream failure. This commit also changes all uses of Predef.identity to use `conforms` and removes the HTTP impl.util.identityFunc.
This commit is contained in:
parent
654fa41443
commit
1500d1f36d
56 changed files with 3484 additions and 720 deletions
|
|
@ -66,15 +66,15 @@ private[http] object OutgoingConnectionBlueprint {
|
|||
|
||||
import ParserOutput._
|
||||
val responsePrep = Flow[List[ResponseOutput]]
|
||||
.mapConcat(identityFunc)
|
||||
.mapConcat(conforms)
|
||||
.splitWhen(x ⇒ x.isInstanceOf[MessageStart] || x == MessageEnd)
|
||||
.via(headAndTailFlow)
|
||||
.prefixAndTail(1)
|
||||
.collect {
|
||||
case (ResponseStart(statusCode, protocol, headers, createEntity, _), entityParts) ⇒
|
||||
case (Seq(ResponseStart(statusCode, protocol, headers, createEntity, _)), entityParts) ⇒
|
||||
val entity = createEntity(entityParts) withSizeLimit parserSettings.maxContentLength
|
||||
HttpResponse(statusCode, headers, entity, protocol)
|
||||
case (MessageStartError(_, info), _) ⇒ throw IllegalResponseException(info)
|
||||
}
|
||||
case (Seq(MessageStartError(_, info)), _) ⇒ throw IllegalResponseException(info)
|
||||
}.concatSubstreams
|
||||
|
||||
val core = BidiFlow.fromGraph(GraphDSL.create() { implicit b ⇒
|
||||
import GraphDSL.Implicits._
|
||||
|
|
|
|||
|
|
@ -41,15 +41,15 @@ private object PoolSlot {
|
|||
Stream Setup
|
||||
============
|
||||
|
||||
Request- +-----------+ +-------------+ +-------------+ +------------+
|
||||
Request- +-----------+ +-------------+ +-------------+ +------------+
|
||||
Context | Slot- | List[ | flatten | Processor- | doubler | | SlotEvent- | Response-
|
||||
+--------->| Processor +------------->| (MapConcat) +------------->| (MapConcat) +---->| Split +------------->
|
||||
| | Processor- | | Out | | | | Context
|
||||
+-----------+ Out] +-------------+ +-------------+ +-----+------+
|
||||
| RawSlotEvent
|
||||
| | Processor- | | Out | | | | Context
|
||||
+-----------+ Out] +-------------+ +-------------+ +-----+------+
|
||||
| RawSlotEvent
|
||||
| (to Conductor
|
||||
| via slotEventMerge)
|
||||
v
|
||||
v
|
||||
*/
|
||||
def apply(slotIx: Int, connectionFlow: Flow[HttpRequest, HttpResponse, Any],
|
||||
remoteAddress: InetSocketAddress, // TODO: remove after #16168 is cleared
|
||||
|
|
@ -65,7 +65,7 @@ private object PoolSlot {
|
|||
val actor = system.actorOf(Props(new SlotProcessor(slotIx, connectionFlow, settings)).withDeploy(Deploy.local),
|
||||
name)
|
||||
ActorProcessor[RequestContext, List[ProcessorOut]](actor)
|
||||
}.mapConcat(identity)
|
||||
}.mapConcat(conforms)
|
||||
}
|
||||
val split = b.add(Broadcast[ProcessorOut](2))
|
||||
|
||||
|
|
@ -196,7 +196,7 @@ private object PoolSlot {
|
|||
} else {
|
||||
inflightRequests.map { rc ⇒
|
||||
if (rc.retriesLeft == 0) {
|
||||
val reason = error.fold[Throwable](new RuntimeException("Unexpected disconnect"))(identityFunc)
|
||||
val reason = error.fold[Throwable](new RuntimeException("Unexpected disconnect"))(conforms)
|
||||
connInport ! ActorPublisherMessage.Cancel
|
||||
ResponseDelivery(ResponseContext(rc, Failure(reason)))
|
||||
} else SlotEvent.RetryRequest(rc.copy(retriesLeft = rc.retriesLeft - 1))
|
||||
|
|
|
|||
|
|
@ -87,7 +87,9 @@ private[http] object HttpServerBluePrint {
|
|||
BidiFlow.fromFlows(Flow[HttpResponse],
|
||||
Flow[RequestOutput]
|
||||
.splitWhen(x ⇒ x.isInstanceOf[MessageStart] || x == MessageEnd)
|
||||
.via(headAndTailFlow)
|
||||
.prefixAndTail(1)
|
||||
.map(p ⇒ p._1.head -> p._2)
|
||||
.concatSubstreams
|
||||
.via(requestStartOrRunIgnore(settings)))
|
||||
|
||||
def requestStartOrRunIgnore(settings: ServerSettings)(implicit mat: Materializer): Flow[(ParserOutput.RequestOutput, Source[ParserOutput.RequestOutput, Unit]), HttpRequest, Unit] =
|
||||
|
|
|
|||
|
|
@ -21,7 +21,7 @@ private[http] object MessageToFrameRenderer {
|
|||
// FIXME: fragment?
|
||||
Source.single(FrameEvent.fullFrame(opcode, None, data, fin = true))
|
||||
|
||||
def streamedFrames(opcode: Opcode, data: Source[ByteString, _]): Source[FrameStart, _] =
|
||||
def streamedFrames[M](opcode: Opcode, data: Source[ByteString, M]): Source[FrameStart, Unit] =
|
||||
Source.single(FrameEvent.empty(opcode, fin = false)) ++
|
||||
data.map(FrameEvent.fullFrame(Opcode.Continuation, None, _, fin = false)) ++
|
||||
Source.single(FrameEvent.emptyLastContinuationFrame)
|
||||
|
|
|
|||
|
|
@ -85,36 +85,39 @@ private[http] object Websocket {
|
|||
}
|
||||
|
||||
/** Collects user-level API messages from MessageDataParts */
|
||||
val collectMessage: Flow[Source[MessageDataPart, Unit], Message, Unit] =
|
||||
Flow[Source[MessageDataPart, Unit]]
|
||||
.via(headAndTailFlow)
|
||||
val collectMessage: Flow[MessageDataPart, Message, Unit] =
|
||||
Flow[MessageDataPart]
|
||||
.prefixAndTail(1)
|
||||
.map {
|
||||
case (TextMessagePart(text, true), remaining) ⇒
|
||||
TextMessage.Strict(text)
|
||||
case (first @ TextMessagePart(text, false), remaining) ⇒
|
||||
TextMessage(
|
||||
(Source.single(first) ++ remaining)
|
||||
.collect {
|
||||
case t: TextMessagePart if t.data.nonEmpty ⇒ t.data
|
||||
})
|
||||
case (BinaryMessagePart(data, true), remaining) ⇒
|
||||
BinaryMessage.Strict(data)
|
||||
case (first @ BinaryMessagePart(data, false), remaining) ⇒
|
||||
BinaryMessage(
|
||||
(Source.single(first) ++ remaining)
|
||||
.collect {
|
||||
case t: BinaryMessagePart if t.data.nonEmpty ⇒ t.data
|
||||
})
|
||||
case (seq, remaining) ⇒ seq.head match {
|
||||
case TextMessagePart(text, true) ⇒
|
||||
TextMessage.Strict(text)
|
||||
case first @ TextMessagePart(text, false) ⇒
|
||||
TextMessage(
|
||||
(Source.single(first) ++ remaining)
|
||||
.collect {
|
||||
case t: TextMessagePart if t.data.nonEmpty ⇒ t.data
|
||||
})
|
||||
case BinaryMessagePart(data, true) ⇒
|
||||
BinaryMessage.Strict(data)
|
||||
case first @ BinaryMessagePart(data, false) ⇒
|
||||
BinaryMessage(
|
||||
(Source.single(first) ++ remaining)
|
||||
.collect {
|
||||
case t: BinaryMessagePart if t.data.nonEmpty ⇒ t.data
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
def prepareMessages: Flow[MessagePart, Message, Unit] =
|
||||
Flow[MessagePart]
|
||||
.transform(() ⇒ new PrepareForUserHandler)
|
||||
.splitWhen(_.isMessageEnd) // FIXME using splitAfter from #16885 would simplify protocol a lot
|
||||
.map(_.collect {
|
||||
.collect {
|
||||
case m: MessageDataPart ⇒ m
|
||||
})
|
||||
}
|
||||
.via(collectMessage)
|
||||
.concatSubstreams
|
||||
.named("ws-prepare-messages")
|
||||
|
||||
def renderMessages: Flow[Message, FrameStart, Unit] =
|
||||
|
|
|
|||
|
|
@ -47,14 +47,6 @@ package object util {
|
|||
private[http] implicit def enhanceByteStringsMat[Mat](byteStrings: Source[ByteString, Mat]): EnhancedByteStringSource[Mat] =
|
||||
new EnhancedByteStringSource(byteStrings)
|
||||
|
||||
private[http] def headAndTailFlow[T]: Flow[Source[T, Any], (T, Source[T, Unit]), Unit] =
|
||||
Flow[Source[T, Any]]
|
||||
.flatMapConcat {
|
||||
_.prefixAndTail(1)
|
||||
.filter(_._1.nonEmpty)
|
||||
.map { case (prefix, tail) ⇒ (prefix.head, tail) }
|
||||
}
|
||||
|
||||
private[http] def printEvent[T](marker: String): Flow[T, T, Unit] =
|
||||
Flow[T].transform(() ⇒ new PushPullStage[T, T] {
|
||||
override def onPush(element: T, ctx: Context[T]): SyncDirective = {
|
||||
|
|
@ -110,10 +102,6 @@ package object util {
|
|||
}
|
||||
}
|
||||
|
||||
private[this] val _identityFunc: Any ⇒ Any = x ⇒ x
|
||||
/** Returns a constant identity function to avoid allocating the closure */
|
||||
private[http] def identityFunc[T]: T ⇒ T = _identityFunc.asInstanceOf[T ⇒ T]
|
||||
|
||||
private[http] def humanReadableByteCount(bytes: Long, si: Boolean): String = {
|
||||
val unit = if (si) 1000 else 1024
|
||||
if (bytes >= unit) {
|
||||
|
|
|
|||
|
|
@ -475,13 +475,14 @@ class RequestParserSpec extends FreeSpec with Matchers with BeforeAndAfterAll {
|
|||
.map(ByteString.apply)
|
||||
.transform(() ⇒ parser.stage).named("parser")
|
||||
.splitWhen(x ⇒ x.isInstanceOf[MessageStart] || x.isInstanceOf[EntityStreamError])
|
||||
.via(headAndTailFlow)
|
||||
.prefixAndTail(1)
|
||||
.collect {
|
||||
case (RequestStart(method, uri, protocol, headers, createEntity, _, close), entityParts) ⇒
|
||||
case (Seq(RequestStart(method, uri, protocol, headers, createEntity, _, close)), entityParts) ⇒
|
||||
closeAfterResponseCompletion :+= close
|
||||
Right(HttpRequest(method, uri, headers, createEntity(entityParts), protocol))
|
||||
case (x @ (MessageStartError(_, _) | EntityStreamError(_)), _) ⇒ Left(x)
|
||||
case (Seq(x @ (MessageStartError(_, _) | EntityStreamError(_))), _) ⇒ Left(x)
|
||||
}
|
||||
.concatSubstreams
|
||||
.flatMapConcat { x ⇒
|
||||
Source {
|
||||
x match {
|
||||
|
|
|
|||
|
|
@ -293,13 +293,13 @@ class ResponseParserSpec extends FreeSpec with Matchers with BeforeAndAfterAll {
|
|||
.map(ByteString.apply)
|
||||
.transform(() ⇒ newParserStage(requestMethod)).named("parser")
|
||||
.splitWhen(x ⇒ x.isInstanceOf[MessageStart] || x.isInstanceOf[EntityStreamError])
|
||||
.via(headAndTailFlow)
|
||||
.prefixAndTail(1)
|
||||
.collect {
|
||||
case (ResponseStart(statusCode, protocol, headers, createEntity, close), entityParts) ⇒
|
||||
case (Seq(ResponseStart(statusCode, protocol, headers, createEntity, close)), entityParts) ⇒
|
||||
closeAfterResponseCompletion :+= close
|
||||
Right(HttpResponse(statusCode, headers, createEntity(entityParts), protocol))
|
||||
case (x @ (MessageStartError(_, _) | EntityStreamError(_)), _) ⇒ Left(x)
|
||||
}
|
||||
case (Seq(x @ (MessageStartError(_, _) | EntityStreamError(_))), _) ⇒ Left(x)
|
||||
}.concatSubstreams
|
||||
|
||||
def collectBlocking[T](source: Source[T, Any]): Seq[T] =
|
||||
Await.result(source.grouped(100000).runWith(Sink.head), 500.millis)
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue