From 8702f09f101ff38d80fb5efbb76623053f227f2d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Endre=20S=C3=A1ndor=20Varga?= Date: Tue, 23 Jun 2015 16:55:27 +0200 Subject: [PATCH] Remove simple line parsing, exposing explicit delimiter stage instead --- akka-docs-dev/rst/java/stream-io.rst | 2 +- .../stream/cookbook/RecipeParseLines.scala | 3 ++- .../docs/stream/io/StreamTcpDocSpec.scala | 9 ++++++--- akka-docs-dev/rst/scala/stream-cookbook.rst | 3 +-- akka-docs-dev/rst/scala/stream-io.rst | 2 +- .../scala/akka/stream/io/FramingSpec.scala | 15 +++++++++----- .../main/scala/akka/stream/io/Framing.scala | 20 ------------------- 7 files changed, 21 insertions(+), 33 deletions(-) diff --git a/akka-docs-dev/rst/java/stream-io.rst b/akka-docs-dev/rst/java/stream-io.rst index 743992e84a..1b0a6f9a5f 100644 --- a/akka-docs-dev/rst/java/stream-io.rst +++ b/akka-docs-dev/rst/java/stream-io.rst @@ -23,7 +23,7 @@ which will emit an :class:`IncomingConnection` element for each new connection t Next, we simply handle *each* incoming connection using a :class:`Flow` which will be used as the processing stage to handle and emit ByteStrings from and to the TCP Socket. Since one :class:`ByteString` does not have to necessarily -correspond to exactly one line of text (the client might be sending the line in chunks) we use the ``lines`` +correspond to exactly one line of text (the client might be sending the line in chunks) we use the ``delimiter`` helper Flow from ``akka.stream.io.Framing`` to chunk the inputs up into actual lines of text. The last boolean argument indicates that we require an explicit line ending even for the last message before the connection is closed. In this example we simply add exclamation marks to each incoming text message and push it through the flow: diff --git a/akka-docs-dev/rst/scala/code/docs/stream/cookbook/RecipeParseLines.scala b/akka-docs-dev/rst/scala/code/docs/stream/cookbook/RecipeParseLines.scala index f469aebca3..6b04285231 100644 --- a/akka-docs-dev/rst/scala/code/docs/stream/cookbook/RecipeParseLines.scala +++ b/akka-docs-dev/rst/scala/code/docs/stream/cookbook/RecipeParseLines.scala @@ -22,7 +22,8 @@ class RecipeParseLines extends RecipeSpec { //#parse-lines import akka.stream.io.Framing - val linesStream = rawData.via(Framing.lines("\r\n", maximumLineBytes = 100)) + val linesStream = rawData.via( + Framing.delimiter(ByteString("\r\n"), maximumFrameLength = 100, allowTruncation = true)).map(_.utf8String) //#parse-lines Await.result(linesStream.grouped(10).runWith(Sink.head), 3.seconds) should be(List( diff --git a/akka-docs-dev/rst/scala/code/docs/stream/io/StreamTcpDocSpec.scala b/akka-docs-dev/rst/scala/code/docs/stream/io/StreamTcpDocSpec.scala index 0c5b191a6a..821576f118 100644 --- a/akka-docs-dev/rst/scala/code/docs/stream/io/StreamTcpDocSpec.scala +++ b/akka-docs-dev/rst/scala/code/docs/stream/io/StreamTcpDocSpec.scala @@ -47,7 +47,8 @@ class StreamTcpDocSpec extends AkkaSpec { println(s"New connection from: ${connection.remoteAddress}") val echo = Flow[ByteString] - .via(Framing.lines("\n", maximumLineBytes = 256, allowTruncation = false)) + .via(Framing.delimiter(ByteString("\n"), maximumFrameLength = 256, allowTruncation = true)) + .map(_.utf8String) .map(_ + "!!!\n") .map(ByteString(_)) @@ -85,7 +86,8 @@ class StreamTcpDocSpec extends AkkaSpec { val welcome = Source.single(ByteString(welcomeMsg)) val echo = b.add(Flow[ByteString] - .via(Framing.lines("\n", maximumLineBytes = 256, allowTruncation = false)) + .via(Framing.delimiter(ByteString("\n"), maximumFrameLength = 256, allowTruncation = true)) + .map(_.utf8String) //#welcome-banner-chat-server .map { command ⇒ serverProbe.ref ! command; command } //#welcome-banner-chat-server @@ -136,7 +138,8 @@ class StreamTcpDocSpec extends AkkaSpec { } val repl = Flow[ByteString] - .via(Framing.lines("\n", maximumLineBytes = 256, allowTruncation = false)) + .via(Framing.delimiter(ByteString("\n"), maximumFrameLength = 256, allowTruncation = true)) + .map(_.utf8String) .map(text => println("Server: " + text)) .map(_ => readLine("> ")) .transform(() ⇒ replParser) diff --git a/akka-docs-dev/rst/scala/stream-cookbook.rst b/akka-docs-dev/rst/scala/stream-cookbook.rst index 2e5c63aa4c..79fa3796ac 100644 --- a/akka-docs-dev/rst/scala/stream-cookbook.rst +++ b/akka-docs-dev/rst/scala/stream-cookbook.rst @@ -96,8 +96,7 @@ Parsing lines from a stream of ByteStrings characters (or, alternatively, containing binary frames delimited by a special delimiter byte sequence) which needs to be parsed. -The :class:`Framing` helper object contains a convenience method to parse messages from a stream of ``ByteStrings`` -and in particular it has basic support for parsing text lines: +The :class:`Framing` helper object contains a convenience method to parse messages from a stream of ``ByteStrings``: .. includecode:: code/docs/stream/cookbook/RecipeParseLines.scala#parse-lines diff --git a/akka-docs-dev/rst/scala/stream-io.rst b/akka-docs-dev/rst/scala/stream-io.rst index 0e9b0c1046..9973b2bf9b 100644 --- a/akka-docs-dev/rst/scala/stream-io.rst +++ b/akka-docs-dev/rst/scala/stream-io.rst @@ -23,7 +23,7 @@ which will emit an :class:`IncomingConnection` element for each new connection t Next, we simply handle *each* incoming connection using a :class:`Flow` which will be used as the processing stage to handle and emit ByteStrings from and to the TCP Socket. Since one :class:`ByteString` does not have to necessarily -correspond to exactly one line of text (the client might be sending the line in chunks) we use the ``Framing.lines`` +correspond to exactly one line of text (the client might be sending the line in chunks) we use the ``Framing.delmiter`` helper Flow to chunk the inputs up into actual lines of text. The last boolean argument indicates that we require an explicit line ending even for the last message before the connection is closed. In this example we simply add exclamation marks to each incoming text message and push it through the flow: diff --git a/akka-stream-tests/src/test/scala/akka/stream/io/FramingSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/io/FramingSpec.scala index 0c1c05d70a..560fcd7aa3 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/io/FramingSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/io/FramingSpec.scala @@ -61,6 +61,11 @@ class FramingSpec extends AkkaSpec { val delimiterBytes = List("\n", "\r\n", "FOO").map(ByteString(_)) val baseTestSequences = List("", "foo", "hello world").map(ByteString(_)) + // Helper to simplify testing + def simpleLines(delimiter: String, maximumBytes: Int, allowTruncation: Boolean = true) = + Framing.delimiter(ByteString(delimiter), maximumBytes, allowTruncation).map(_.utf8String) + .named("lineFraming") + def completeTestSequences(delimiter: ByteString): immutable.Iterable[ByteString] = for (prefix ← 0 until delimiter.size; s ← baseTestSequences) yield delimiter.take(prefix) ++ s @@ -81,19 +86,19 @@ class FramingSpec extends AkkaSpec { "Respect maximum line settings" in { // The buffer will contain more than 1 bytes, but the individual frames are less Await.result( - Source.single(ByteString("a\nb\nc\nd\n")).via(Framing.lines("\n", 1)).grouped(100).runWith(Sink.head), + Source.single(ByteString("a\nb\nc\nd\n")).via(simpleLines("\n", 1)).grouped(100).runWith(Sink.head), 3.seconds) should ===(List("a", "b", "c", "d")) an[FramingException] should be thrownBy { Await.result( - Source.single(ByteString("ab\n")).via(Framing.lines("\n", 1)).grouped(100).runWith(Sink.head), + Source.single(ByteString("ab\n")).via(simpleLines("\n", 1)).grouped(100).runWith(Sink.head), 3.seconds) } } "work with empty streams" in { Await.result( - Source.empty.via(Framing.lines("\n", 256)).runFold(Vector.empty[String])(_ :+ _), + Source.empty.via(simpleLines("\n", 256)).runFold(Vector.empty[String])(_ :+ _), 3.seconds) should ===(Vector.empty) } @@ -101,7 +106,7 @@ class FramingSpec extends AkkaSpec { an[FramingException] should be thrownBy { Await.result( Source.single(ByteString("I have no end")) - .via(Framing.lines("\n", 256, allowTruncation = false)) + .via(simpleLines("\n", 256, allowTruncation = false)) .grouped(1000) .runWith(Sink.head), 3.seconds) @@ -111,7 +116,7 @@ class FramingSpec extends AkkaSpec { "allow truncated frames if configured so" in { Await.result( Source.single(ByteString("I have no end")) - .via(Framing.lines("\n", 256, allowTruncation = true)) + .via(simpleLines("\n", 256, allowTruncation = true)) .grouped(1000) .runWith(Sink.head), 3.seconds) should ===(List("I have no end")) diff --git a/akka-stream/src/main/scala/akka/stream/io/Framing.scala b/akka-stream/src/main/scala/akka/stream/io/Framing.scala index 4331bc13fb..be4fb4570a 100644 --- a/akka-stream/src/main/scala/akka/stream/io/Framing.scala +++ b/akka-stream/src/main/scala/akka/stream/io/Framing.scala @@ -34,26 +34,6 @@ object Framing { Flow[ByteString].transform(() ⇒ new DelimiterFramingStage(delimiter, maximumFrameLength, allowTruncation)) .named("delimiterFraming") - /** - * A convenience wrapper on top of [[Framing#delimiter]] using ''String'' as the output and separator sequence types. - * Returns a Flow that decodes an unstructured input stream of byte chunks, decoding them to Strings using a separator - * String as end-of-line marker. - * - * This decoder stage treats decoded frames as simple byte sequences, converting to UTF-8 only after the frame - * boundary has been found. This means that this is not a fully UTF-8 compliant line parser. - * - * @param delimiter String to be used as a delimiter. Be aware that not all UTF-8 strings are safe to use as a - * delimiter when the input bytes are UTF-8 encoded. - * @param allowTruncation If turned on, then when the last string being decoded contains no valid delimiter this Flow - * fails the stream instead of returning a truncated string. - * @param maximumLineBytes - * The maximum allowed length for decoded strings in bytes (not in characters). - * @return - */ - def lines(delimiter: String, maximumLineBytes: Int, allowTruncation: Boolean = true): Flow[ByteString, String, Unit] = - Framing.delimiter(ByteString(delimiter), maximumLineBytes, allowTruncation).map(_.utf8String) - .named("lineFraming") - /** * Creates a Flow that decodes an incoming stream of unstructured byte chunks into a stream of frames, assuming that * incoming frames have a field that encodes their length.