Merge pull request #17446 from drewhk/wip-17310-basic-framing-support-drewhk

+str #17310: Basic framing support
This commit is contained in:
drewhk 2015-06-12 10:40:01 +02:00
commit 16afc46931
9 changed files with 589 additions and 85 deletions

View file

@ -20,9 +20,10 @@ class RecipeParseLines extends RecipeSpec {
ByteString("\nHello Akka!\r\nHello Streams!"),
ByteString("\r\n\r\n")))
import RecipeParseLines._
val linesStream = rawData.transform(() => parseLines("\r\n", 100))
//#parse-lines
import akka.stream.io.Framing
val linesStream = rawData.via(Framing.lines("\r\n", maximumLineBytes = 100))
//#parse-lines
Await.result(linesStream.grouped(10).runWith(Sink.head), 3.seconds) should be(List(
"Hello World\r!",
@ -34,58 +35,3 @@ class RecipeParseLines extends RecipeSpec {
}
}
object RecipeParseLines {
import akka.stream.stage._
//#parse-lines
def parseLines(separator: String, maximumLineBytes: Int) =
new StatefulStage[ByteString, String] {
private val separatorBytes = ByteString(separator)
private val firstSeparatorByte = separatorBytes.head
private var buffer = ByteString.empty
private var nextPossibleMatch = 0
def initial = new State {
override def onPush(chunk: ByteString, ctx: Context[String]): SyncDirective = {
buffer ++= chunk
if (buffer.size > maximumLineBytes)
ctx.fail(new IllegalStateException(s"Read ${buffer.size} bytes " +
s"which is more than $maximumLineBytes without seeing a line terminator"))
else emit(doParse(Vector.empty).iterator, ctx)
}
@tailrec
private def doParse(parsedLinesSoFar: Vector[String]): Vector[String] = {
val possibleMatchPos = buffer.indexOf(firstSeparatorByte, from = nextPossibleMatch)
if (possibleMatchPos == -1) {
// No matching character, we need to accumulate more bytes into the buffer
nextPossibleMatch = buffer.size
parsedLinesSoFar
} else if (possibleMatchPos + separatorBytes.size > buffer.size) {
// We have found a possible match (we found the first character of the terminator
// sequence) but we don't have yet enough bytes. We remember the position to
// retry from next time.
nextPossibleMatch = possibleMatchPos
parsedLinesSoFar
} else {
if (buffer.slice(possibleMatchPos, possibleMatchPos + separatorBytes.size)
== separatorBytes) {
// Found a match
val parsedLine = buffer.slice(0, possibleMatchPos).utf8String
buffer = buffer.drop(possibleMatchPos + separatorBytes.size)
nextPossibleMatch -= possibleMatchPos + separatorBytes.size
doParse(parsedLinesSoFar :+ parsedLine)
} else {
nextPossibleMatch += 1
doParse(parsedLinesSoFar)
}
}
}
}
}
//#parse-lines
}

View file

@ -41,11 +41,13 @@ class StreamTcpDocSpec extends AkkaSpec {
Tcp().bind(localhost.getHostName, localhost.getPort) // TODO getHostString in Java7
//#echo-server-simple-handle
import akka.stream.io.Framing
connections runForeach { connection =>
println(s"New connection from: ${connection.remoteAddress}")
val echo = Flow[ByteString]
.transform(() => RecipeParseLines.parseLines("\n", maximumLineBytes = 256))
.via(Framing.lines("\n", maximumLineBytes = 256, allowTruncation = false))
.map(_ + "!!!\n")
.map(ByteString(_))
@ -60,7 +62,9 @@ class StreamTcpDocSpec extends AkkaSpec {
val connections = Tcp().bind(localhost.getHostName, localhost.getPort) // TODO getHostString in Java7
val serverProbe = TestProbe()
import akka.stream.io.Framing
//#welcome-banner-chat-server
connections runForeach { connection =>
val serverLogic = Flow() { implicit b =>
@ -81,7 +85,7 @@ class StreamTcpDocSpec extends AkkaSpec {
val welcome = Source.single(ByteString(welcomeMsg))
val echo = b.add(Flow[ByteString]
.transform(() => RecipeParseLines.parseLines("\n", maximumLineBytes = 256))
.via(Framing.lines("\n", maximumLineBytes = 256, allowTruncation = false))
//#welcome-banner-chat-server
.map { command serverProbe.ref ! command; command }
//#welcome-banner-chat-server
@ -101,6 +105,7 @@ class StreamTcpDocSpec extends AkkaSpec {
connection.handleWith(serverLogic)
}
import akka.stream.io.Framing
//#welcome-banner-chat-server
val input = new AtomicReference("Hello world" :: "What a lovely day" :: Nil)
@ -131,7 +136,7 @@ class StreamTcpDocSpec extends AkkaSpec {
}
val repl = Flow[ByteString]
.transform(() => RecipeParseLines.parseLines("\n", maximumLineBytes = 256))
.via(Framing.lines("\n", maximumLineBytes = 256, allowTruncation = false))
.map(text => println("Server: " + text))
.map(_ => readLine("> "))
.transform(() replParser)

View file

@ -96,22 +96,8 @@ Parsing lines from a stream of ByteStrings
characters (or, alternatively, containing binary frames delimited by a special delimiter byte sequence) which
needs to be parsed.
We express our solution as a :class:`StatefulStage` because it has support for emitting multiple elements easily
through its ``emit(iterator, ctx)`` helper method. Since an incoming ByteString chunk might contain multiple lines (frames)
this feature comes in handy.
To create the parser we only need to hook into the ``onPush`` handler. We maintain a buffer of bytes (expressed as
a :class:`ByteString`) by simply concatenating incoming chunks with it. Since we don't want to allow unbounded size
lines (records) we always check if the buffer size is larger than the allowed ``maximumLineBytes`` value, and terminate
the stream if this invariant is violated.
After we updated the buffer, we try to find the terminator sequence as a subsequence of the current buffer. To be
efficient, we also maintain a pointer ``nextPossibleMatch`` into the buffer so that we only search that part of the
buffer where new matches are possible.
The search for a match is done in two steps: first we try to search for the first character of the terminator sequence
in the buffer. If we find a match, we do a full subsequence check to see if we had a false positive or not. The parsing
logic is recursive to be able to parse multiple lines (records) contained in the decoding buffer.
The :class:`Framing` helper object contains a convenience method to parse messages from a stream of ``ByteStrings``
and in particular it has basic support for parsing text lines:
.. includecode:: code/docs/stream/cookbook/RecipeParseLines.scala#parse-lines

View file

@ -23,8 +23,9 @@ which will emit an :class:`IncomingConnection` element for each new connection t
Next, we simply handle *each* incoming connection using a :class:`Flow` which will be used as the processing stage
to handle and emit ByteStrings from and to the TCP Socket. Since one :class:`ByteString` does not have to necessarily
correspond to exactly one line of text (the client might be sending the line in chunks) we use the ``parseLines``
recipe from the :ref:`cookbook-parse-lines-scala` Akka Streams Cookbook recipe to chunk the inputs up into actual lines of text.
correspond to exactly one line of text (the client might be sending the line in chunks) we use the ``Framing.lines``
helper Flow to chunk the inputs up into actual lines of text. The last boolean
argument indicates that we require an explicit line ending even for the last message before the connection is closed.
In this example we simply add exclamation marks to each incoming text message and push it through the flow:
.. includecode:: code/docs/stream/io/StreamTcpDocSpec.scala#echo-server-simple-handle