=htc #16055 refactoring: make headAndTail a simple flow

This commit is contained in:
Johannes Rudolph 2015-04-28 16:14:42 +02:00
parent 0f15cb6d78
commit 444a0deef5
6 changed files with 9 additions and 20 deletions

View file

@ -72,7 +72,7 @@ private[http] object OutgoingConnectionBlueprint {
.transform(StreamUtils.recover { case x: ResponseParsingError x.error :: Nil }) // FIXME after #16565 .transform(StreamUtils.recover { case x: ResponseParsingError x.error :: Nil }) // FIXME after #16565
.mapConcat(identityFunc) .mapConcat(identityFunc)
.splitWhen(x x.isInstanceOf[MessageStart] || x == MessageEnd) .splitWhen(x x.isInstanceOf[MessageStart] || x == MessageEnd)
.headAndTail .via(headAndTailFlow)
.collect { .collect {
case (ResponseStart(statusCode, protocol, headers, createEntity, _), entityParts) case (ResponseStart(statusCode, protocol, headers, createEntity, _), entityParts)
HttpResponse(statusCode, headers, createEntity(entityParts), protocol) HttpResponse(statusCode, headers, createEntity(entityParts), protocol)

View file

@ -62,7 +62,7 @@ private[http] object HttpServerBluePrint {
val requestPreparation = val requestPreparation =
Flow[RequestOutput] Flow[RequestOutput]
.splitWhen(x x.isInstanceOf[MessageStart] || x == MessageEnd) .splitWhen(x x.isInstanceOf[MessageStart] || x == MessageEnd)
.headAndTail .via(headAndTailFlow)
.map { .map {
case (RequestStart(method, uri, protocol, headers, createEntity, _, _), entityParts) case (RequestStart(method, uri, protocol, headers, createEntity, _, _), entityParts)
val effectiveUri = HttpRequest.effectiveUri(uri, headers, securedConnection = false, defaultHostHeader) val effectiveUri = HttpRequest.effectiveUri(uri, headers, securedConnection = false, defaultHostHeader)

View file

@ -47,7 +47,7 @@ private[http] object Websocket {
/** Collects user-level API messages from MessageDataParts */ /** Collects user-level API messages from MessageDataParts */
val collectMessage: Flow[Source[MessageDataPart, Unit], Message, Unit] = val collectMessage: Flow[Source[MessageDataPart, Unit], Message, Unit] =
Flow[Source[MessageDataPart, Unit]] Flow[Source[MessageDataPart, Unit]]
.headAndTail .via(headAndTailFlow)
.map { .map {
case (TextMessagePart(text, true), remaining) case (TextMessagePart(text, true), remaining)
TextMessage.Strict(text) TextMessage.Strict(text)

View file

@ -43,25 +43,14 @@ package object util {
private[http] implicit def enhanceByteStrings[Mat](byteStrings: Source[ByteString, Mat]): EnhancedByteStringSource[Mat] = private[http] implicit def enhanceByteStrings[Mat](byteStrings: Source[ByteString, Mat]): EnhancedByteStringSource[Mat] =
new EnhancedByteStringSource(byteStrings) new EnhancedByteStringSource(byteStrings)
private[http] implicit class SourceWithHeadAndTail[T, Mat](val underlying: Source[Source[T, Any], Mat]) extends AnyVal { private[http] def headAndTailFlow[T]: Flow[Source[T, Any], (T, Source[T, Unit]), Unit] =
def headAndTail: Source[(T, Source[T, Unit]), Mat] = Flow[Source[T, Any]]
underlying.map { .map {
_.prefixAndTail(1) _.prefixAndTail(1)
.filter(_._1.nonEmpty) .filter(_._1.nonEmpty)
.map { case (prefix, tail) (prefix.head, tail) } .map { case (prefix, tail) (prefix.head, tail) }
} }
.flatten(FlattenStrategy.concat) .flatten(FlattenStrategy.concat)
}
private[http] implicit class FlowWithHeadAndTail[In, Out, Mat](val underlying: Flow[In, Source[Out, Any], Mat]) extends AnyVal {
def headAndTail: Flow[In, (Out, Source[Out, Unit]), Mat] =
underlying.map {
_.prefixAndTail(1)
.filter(_._1.nonEmpty)
.map { case (prefix, tail) (prefix.head, tail) }
}
.flatten(FlattenStrategy.concat)
}
private[http] def printEvent[T](marker: String): Flow[T, T, Unit] = private[http] def printEvent[T](marker: String): Flow[T, T, Unit] =
Flow[T].transform(() new PushPullStage[T, T] { Flow[T].transform(() new PushPullStage[T, T] {

View file

@ -453,7 +453,7 @@ class RequestParserSpec extends FreeSpec with Matchers with BeforeAndAfterAll {
.map(ByteString.apply) .map(ByteString.apply)
.transform(() parser.stage).named("parser") .transform(() parser.stage).named("parser")
.splitWhen(x x.isInstanceOf[MessageStart] || x.isInstanceOf[EntityStreamError]) .splitWhen(x x.isInstanceOf[MessageStart] || x.isInstanceOf[EntityStreamError])
.headAndTail .via(headAndTailFlow)
.collect { .collect {
case (RequestStart(method, uri, protocol, headers, createEntity, _, close), entityParts) case (RequestStart(method, uri, protocol, headers, createEntity, _, close), entityParts)
closeAfterResponseCompletion :+= close closeAfterResponseCompletion :+= close

View file

@ -264,7 +264,7 @@ class ResponseParserSpec extends FreeSpec with Matchers with BeforeAndAfterAll {
.map(ByteString.apply) .map(ByteString.apply)
.transform(() newParserStage(requestMethod)).named("parser") .transform(() newParserStage(requestMethod)).named("parser")
.splitWhen(x x.isInstanceOf[MessageStart] || x.isInstanceOf[EntityStreamError]) .splitWhen(x x.isInstanceOf[MessageStart] || x.isInstanceOf[EntityStreamError])
.headAndTail .via(headAndTailFlow)
.collect { .collect {
case (ResponseStart(statusCode, protocol, headers, createEntity, close), entityParts) case (ResponseStart(statusCode, protocol, headers, createEntity, close), entityParts)
closeAfterResponseCompletion :+= close closeAfterResponseCompletion :+= close