From 444a0deef50ee2bd810a0a6ffa083b652f964f40 Mon Sep 17 00:00:00 2001 From: Johannes Rudolph Date: Tue, 28 Apr 2015 16:14:42 +0200 Subject: [PATCH] =htc #16055 refactoring: make headAndTail a simple flow --- .../client/OutgoingConnectionBlueprint.scala | 2 +- .../engine/server/HttpServerBluePrint.scala | 2 +- .../akka/http/impl/engine/ws/Websocket.scala | 2 +- .../scala/akka/http/impl/util/package.scala | 19 ++++--------------- .../engine/parsing/RequestParserSpec.scala | 2 +- .../engine/parsing/ResponseParserSpec.scala | 2 +- 6 files changed, 9 insertions(+), 20 deletions(-) diff --git a/akka-http-core/src/main/scala/akka/http/impl/engine/client/OutgoingConnectionBlueprint.scala b/akka-http-core/src/main/scala/akka/http/impl/engine/client/OutgoingConnectionBlueprint.scala index 5dae6f4042..d624c05c39 100644 --- a/akka-http-core/src/main/scala/akka/http/impl/engine/client/OutgoingConnectionBlueprint.scala +++ b/akka-http-core/src/main/scala/akka/http/impl/engine/client/OutgoingConnectionBlueprint.scala @@ -72,7 +72,7 @@ private[http] object OutgoingConnectionBlueprint { .transform(StreamUtils.recover { case x: ResponseParsingError ⇒ x.error :: Nil }) // FIXME after #16565 .mapConcat(identityFunc) .splitWhen(x ⇒ x.isInstanceOf[MessageStart] || x == MessageEnd) - .headAndTail + .via(headAndTailFlow) .collect { case (ResponseStart(statusCode, protocol, headers, createEntity, _), entityParts) ⇒ HttpResponse(statusCode, headers, createEntity(entityParts), protocol) diff --git a/akka-http-core/src/main/scala/akka/http/impl/engine/server/HttpServerBluePrint.scala b/akka-http-core/src/main/scala/akka/http/impl/engine/server/HttpServerBluePrint.scala index 85a5106c76..f8a74d7407 100644 --- a/akka-http-core/src/main/scala/akka/http/impl/engine/server/HttpServerBluePrint.scala +++ b/akka-http-core/src/main/scala/akka/http/impl/engine/server/HttpServerBluePrint.scala @@ -62,7 +62,7 @@ private[http] object HttpServerBluePrint { val requestPreparation = Flow[RequestOutput] .splitWhen(x ⇒ x.isInstanceOf[MessageStart] || x == MessageEnd) - .headAndTail + .via(headAndTailFlow) .map { case (RequestStart(method, uri, protocol, headers, createEntity, _, _), entityParts) ⇒ val effectiveUri = HttpRequest.effectiveUri(uri, headers, securedConnection = false, defaultHostHeader) diff --git a/akka-http-core/src/main/scala/akka/http/impl/engine/ws/Websocket.scala b/akka-http-core/src/main/scala/akka/http/impl/engine/ws/Websocket.scala index 03a498095f..bcc0448a77 100644 --- a/akka-http-core/src/main/scala/akka/http/impl/engine/ws/Websocket.scala +++ b/akka-http-core/src/main/scala/akka/http/impl/engine/ws/Websocket.scala @@ -47,7 +47,7 @@ private[http] object Websocket { /** Collects user-level API messages from MessageDataParts */ val collectMessage: Flow[Source[MessageDataPart, Unit], Message, Unit] = Flow[Source[MessageDataPart, Unit]] - .headAndTail + .via(headAndTailFlow) .map { case (TextMessagePart(text, true), remaining) ⇒ TextMessage.Strict(text) diff --git a/akka-http-core/src/main/scala/akka/http/impl/util/package.scala b/akka-http-core/src/main/scala/akka/http/impl/util/package.scala index 9def14b3ab..92141e2c5a 100644 --- a/akka-http-core/src/main/scala/akka/http/impl/util/package.scala +++ b/akka-http-core/src/main/scala/akka/http/impl/util/package.scala @@ -43,25 +43,14 @@ package object util { private[http] implicit def enhanceByteStrings[Mat](byteStrings: Source[ByteString, Mat]): EnhancedByteStringSource[Mat] = new EnhancedByteStringSource(byteStrings) - private[http] implicit class SourceWithHeadAndTail[T, Mat](val underlying: Source[Source[T, Any], Mat]) extends AnyVal { - def headAndTail: Source[(T, Source[T, Unit]), Mat] = - underlying.map { + private[http] def headAndTailFlow[T]: Flow[Source[T, Any], (T, Source[T, Unit]), Unit] = + Flow[Source[T, Any]] + .map { _.prefixAndTail(1) .filter(_._1.nonEmpty) .map { case (prefix, tail) ⇒ (prefix.head, tail) } } - .flatten(FlattenStrategy.concat) - } - - private[http] implicit class FlowWithHeadAndTail[In, Out, Mat](val underlying: Flow[In, Source[Out, Any], Mat]) extends AnyVal { - def headAndTail: Flow[In, (Out, Source[Out, Unit]), Mat] = - underlying.map { - _.prefixAndTail(1) - .filter(_._1.nonEmpty) - .map { case (prefix, tail) ⇒ (prefix.head, tail) } - } - .flatten(FlattenStrategy.concat) - } + .flatten(FlattenStrategy.concat) private[http] def printEvent[T](marker: String): Flow[T, T, Unit] = Flow[T].transform(() ⇒ new PushPullStage[T, T] { diff --git a/akka-http-core/src/test/scala/akka/http/impl/engine/parsing/RequestParserSpec.scala b/akka-http-core/src/test/scala/akka/http/impl/engine/parsing/RequestParserSpec.scala index 724b50d96e..bc613d14f0 100644 --- a/akka-http-core/src/test/scala/akka/http/impl/engine/parsing/RequestParserSpec.scala +++ b/akka-http-core/src/test/scala/akka/http/impl/engine/parsing/RequestParserSpec.scala @@ -453,7 +453,7 @@ class RequestParserSpec extends FreeSpec with Matchers with BeforeAndAfterAll { .map(ByteString.apply) .transform(() ⇒ parser.stage).named("parser") .splitWhen(x ⇒ x.isInstanceOf[MessageStart] || x.isInstanceOf[EntityStreamError]) - .headAndTail + .via(headAndTailFlow) .collect { case (RequestStart(method, uri, protocol, headers, createEntity, _, close), entityParts) ⇒ closeAfterResponseCompletion :+= close diff --git a/akka-http-core/src/test/scala/akka/http/impl/engine/parsing/ResponseParserSpec.scala b/akka-http-core/src/test/scala/akka/http/impl/engine/parsing/ResponseParserSpec.scala index 2ea87dbeb9..20f518796d 100644 --- a/akka-http-core/src/test/scala/akka/http/impl/engine/parsing/ResponseParserSpec.scala +++ b/akka-http-core/src/test/scala/akka/http/impl/engine/parsing/ResponseParserSpec.scala @@ -264,7 +264,7 @@ class ResponseParserSpec extends FreeSpec with Matchers with BeforeAndAfterAll { .map(ByteString.apply) .transform(() ⇒ newParserStage(requestMethod)).named("parser") .splitWhen(x ⇒ x.isInstanceOf[MessageStart] || x.isInstanceOf[EntityStreamError]) - .headAndTail + .via(headAndTailFlow) .collect { case (ResponseStart(statusCode, protocol, headers, createEntity, close), entityParts) ⇒ closeAfterResponseCompletion :+= close