Remove simple line parsing, exposing explicit delimiter stage instead

This commit is contained in:
Endre Sándor Varga 2015-06-23 16:55:27 +02:00
parent 79e24c5976
commit 8702f09f10
7 changed files with 21 additions and 33 deletions

View file

@ -23,7 +23,7 @@ which will emit an :class:`IncomingConnection` element for each new connection t
Next, we simply handle *each* incoming connection using a :class:`Flow` which will be used as the processing stage Next, we simply handle *each* incoming connection using a :class:`Flow` which will be used as the processing stage
to handle and emit ByteStrings from and to the TCP Socket. Since one :class:`ByteString` does not have to necessarily to handle and emit ByteStrings from and to the TCP Socket. Since one :class:`ByteString` does not have to necessarily
correspond to exactly one line of text (the client might be sending the line in chunks) we use the ``lines`` correspond to exactly one line of text (the client might be sending the line in chunks) we use the ``delimiter``
helper Flow from ``akka.stream.io.Framing`` to chunk the inputs up into actual lines of text. The last boolean helper Flow from ``akka.stream.io.Framing`` to chunk the inputs up into actual lines of text. The last boolean
argument indicates that we require an explicit line ending even for the last message before the connection is closed. argument indicates that we require an explicit line ending even for the last message before the connection is closed.
In this example we simply add exclamation marks to each incoming text message and push it through the flow: In this example we simply add exclamation marks to each incoming text message and push it through the flow:

View file

@ -22,7 +22,8 @@ class RecipeParseLines extends RecipeSpec {
//#parse-lines //#parse-lines
import akka.stream.io.Framing import akka.stream.io.Framing
val linesStream = rawData.via(Framing.lines("\r\n", maximumLineBytes = 100)) val linesStream = rawData.via(
Framing.delimiter(ByteString("\r\n"), maximumFrameLength = 100, allowTruncation = true)).map(_.utf8String)
//#parse-lines //#parse-lines
Await.result(linesStream.grouped(10).runWith(Sink.head), 3.seconds) should be(List( Await.result(linesStream.grouped(10).runWith(Sink.head), 3.seconds) should be(List(

View file

@ -47,7 +47,8 @@ class StreamTcpDocSpec extends AkkaSpec {
println(s"New connection from: ${connection.remoteAddress}") println(s"New connection from: ${connection.remoteAddress}")
val echo = Flow[ByteString] val echo = Flow[ByteString]
.via(Framing.lines("\n", maximumLineBytes = 256, allowTruncation = false)) .via(Framing.delimiter(ByteString("\n"), maximumFrameLength = 256, allowTruncation = true))
.map(_.utf8String)
.map(_ + "!!!\n") .map(_ + "!!!\n")
.map(ByteString(_)) .map(ByteString(_))
@ -85,7 +86,8 @@ class StreamTcpDocSpec extends AkkaSpec {
val welcome = Source.single(ByteString(welcomeMsg)) val welcome = Source.single(ByteString(welcomeMsg))
val echo = b.add(Flow[ByteString] val echo = b.add(Flow[ByteString]
.via(Framing.lines("\n", maximumLineBytes = 256, allowTruncation = false)) .via(Framing.delimiter(ByteString("\n"), maximumFrameLength = 256, allowTruncation = true))
.map(_.utf8String)
//#welcome-banner-chat-server //#welcome-banner-chat-server
.map { command serverProbe.ref ! command; command } .map { command serverProbe.ref ! command; command }
//#welcome-banner-chat-server //#welcome-banner-chat-server
@ -136,7 +138,8 @@ class StreamTcpDocSpec extends AkkaSpec {
} }
val repl = Flow[ByteString] val repl = Flow[ByteString]
.via(Framing.lines("\n", maximumLineBytes = 256, allowTruncation = false)) .via(Framing.delimiter(ByteString("\n"), maximumFrameLength = 256, allowTruncation = true))
.map(_.utf8String)
.map(text => println("Server: " + text)) .map(text => println("Server: " + text))
.map(_ => readLine("> ")) .map(_ => readLine("> "))
.transform(() replParser) .transform(() replParser)

View file

@ -96,8 +96,7 @@ Parsing lines from a stream of ByteStrings
characters (or, alternatively, containing binary frames delimited by a special delimiter byte sequence) which characters (or, alternatively, containing binary frames delimited by a special delimiter byte sequence) which
needs to be parsed. needs to be parsed.
The :class:`Framing` helper object contains a convenience method to parse messages from a stream of ``ByteStrings`` The :class:`Framing` helper object contains a convenience method to parse messages from a stream of ``ByteStrings``:
and in particular it has basic support for parsing text lines:
.. includecode:: code/docs/stream/cookbook/RecipeParseLines.scala#parse-lines .. includecode:: code/docs/stream/cookbook/RecipeParseLines.scala#parse-lines

View file

@ -23,7 +23,7 @@ which will emit an :class:`IncomingConnection` element for each new connection t
Next, we simply handle *each* incoming connection using a :class:`Flow` which will be used as the processing stage Next, we simply handle *each* incoming connection using a :class:`Flow` which will be used as the processing stage
to handle and emit ByteStrings from and to the TCP Socket. Since one :class:`ByteString` does not have to necessarily to handle and emit ByteStrings from and to the TCP Socket. Since one :class:`ByteString` does not have to necessarily
correspond to exactly one line of text (the client might be sending the line in chunks) we use the ``Framing.lines`` correspond to exactly one line of text (the client might be sending the line in chunks) we use the ``Framing.delmiter``
helper Flow to chunk the inputs up into actual lines of text. The last boolean helper Flow to chunk the inputs up into actual lines of text. The last boolean
argument indicates that we require an explicit line ending even for the last message before the connection is closed. argument indicates that we require an explicit line ending even for the last message before the connection is closed.
In this example we simply add exclamation marks to each incoming text message and push it through the flow: In this example we simply add exclamation marks to each incoming text message and push it through the flow:

View file

@ -61,6 +61,11 @@ class FramingSpec extends AkkaSpec {
val delimiterBytes = List("\n", "\r\n", "FOO").map(ByteString(_)) val delimiterBytes = List("\n", "\r\n", "FOO").map(ByteString(_))
val baseTestSequences = List("", "foo", "hello world").map(ByteString(_)) val baseTestSequences = List("", "foo", "hello world").map(ByteString(_))
// Helper to simplify testing
def simpleLines(delimiter: String, maximumBytes: Int, allowTruncation: Boolean = true) =
Framing.delimiter(ByteString(delimiter), maximumBytes, allowTruncation).map(_.utf8String)
.named("lineFraming")
def completeTestSequences(delimiter: ByteString): immutable.Iterable[ByteString] = def completeTestSequences(delimiter: ByteString): immutable.Iterable[ByteString] =
for (prefix 0 until delimiter.size; s baseTestSequences) for (prefix 0 until delimiter.size; s baseTestSequences)
yield delimiter.take(prefix) ++ s yield delimiter.take(prefix) ++ s
@ -81,19 +86,19 @@ class FramingSpec extends AkkaSpec {
"Respect maximum line settings" in { "Respect maximum line settings" in {
// The buffer will contain more than 1 bytes, but the individual frames are less // The buffer will contain more than 1 bytes, but the individual frames are less
Await.result( Await.result(
Source.single(ByteString("a\nb\nc\nd\n")).via(Framing.lines("\n", 1)).grouped(100).runWith(Sink.head), Source.single(ByteString("a\nb\nc\nd\n")).via(simpleLines("\n", 1)).grouped(100).runWith(Sink.head),
3.seconds) should ===(List("a", "b", "c", "d")) 3.seconds) should ===(List("a", "b", "c", "d"))
an[FramingException] should be thrownBy { an[FramingException] should be thrownBy {
Await.result( Await.result(
Source.single(ByteString("ab\n")).via(Framing.lines("\n", 1)).grouped(100).runWith(Sink.head), Source.single(ByteString("ab\n")).via(simpleLines("\n", 1)).grouped(100).runWith(Sink.head),
3.seconds) 3.seconds)
} }
} }
"work with empty streams" in { "work with empty streams" in {
Await.result( Await.result(
Source.empty.via(Framing.lines("\n", 256)).runFold(Vector.empty[String])(_ :+ _), Source.empty.via(simpleLines("\n", 256)).runFold(Vector.empty[String])(_ :+ _),
3.seconds) should ===(Vector.empty) 3.seconds) should ===(Vector.empty)
} }
@ -101,7 +106,7 @@ class FramingSpec extends AkkaSpec {
an[FramingException] should be thrownBy { an[FramingException] should be thrownBy {
Await.result( Await.result(
Source.single(ByteString("I have no end")) Source.single(ByteString("I have no end"))
.via(Framing.lines("\n", 256, allowTruncation = false)) .via(simpleLines("\n", 256, allowTruncation = false))
.grouped(1000) .grouped(1000)
.runWith(Sink.head), .runWith(Sink.head),
3.seconds) 3.seconds)
@ -111,7 +116,7 @@ class FramingSpec extends AkkaSpec {
"allow truncated frames if configured so" in { "allow truncated frames if configured so" in {
Await.result( Await.result(
Source.single(ByteString("I have no end")) Source.single(ByteString("I have no end"))
.via(Framing.lines("\n", 256, allowTruncation = true)) .via(simpleLines("\n", 256, allowTruncation = true))
.grouped(1000) .grouped(1000)
.runWith(Sink.head), .runWith(Sink.head),
3.seconds) should ===(List("I have no end")) 3.seconds) should ===(List("I have no end"))

View file

@ -34,26 +34,6 @@ object Framing {
Flow[ByteString].transform(() new DelimiterFramingStage(delimiter, maximumFrameLength, allowTruncation)) Flow[ByteString].transform(() new DelimiterFramingStage(delimiter, maximumFrameLength, allowTruncation))
.named("delimiterFraming") .named("delimiterFraming")
/**
* A convenience wrapper on top of [[Framing#delimiter]] using ''String'' as the output and separator sequence types.
* Returns a Flow that decodes an unstructured input stream of byte chunks, decoding them to Strings using a separator
* String as end-of-line marker.
*
* This decoder stage treats decoded frames as simple byte sequences, converting to UTF-8 only after the frame
* boundary has been found. This means that this is not a fully UTF-8 compliant line parser.
*
* @param delimiter String to be used as a delimiter. Be aware that not all UTF-8 strings are safe to use as a
* delimiter when the input bytes are UTF-8 encoded.
* @param allowTruncation If turned on, then when the last string being decoded contains no valid delimiter this Flow
* fails the stream instead of returning a truncated string.
* @param maximumLineBytes
* The maximum allowed length for decoded strings in bytes (not in characters).
* @return
*/
def lines(delimiter: String, maximumLineBytes: Int, allowTruncation: Boolean = true): Flow[ByteString, String, Unit] =
Framing.delimiter(ByteString(delimiter), maximumLineBytes, allowTruncation).map(_.utf8String)
.named("lineFraming")
/** /**
* Creates a Flow that decodes an incoming stream of unstructured byte chunks into a stream of frames, assuming that * Creates a Flow that decodes an incoming stream of unstructured byte chunks into a stream of frames, assuming that
* incoming frames have a field that encodes their length. * incoming frames have a field that encodes their length.