Fixed decoding in the case of fragmented delimiters #3306

- also improved tests
This commit is contained in:
Endre Sándor Varga 2013-05-06 14:23:54 +02:00
parent 3bc661bed6
commit 3209e38d9d
2 changed files with 69 additions and 23 deletions

View file

@ -9,6 +9,7 @@ import akka.util.ByteString
import akka.actor.{ Props, ActorLogging, Actor, ActorContext }
import akka.TestUtils
import java.util.concurrent.atomic.AtomicInteger
import scala.concurrent.duration._
class DelimiterFramingSpec extends AkkaSpec {
@ -77,9 +78,26 @@ class DelimiterFramingSpec extends AkkaSpec {
probe.expectMsg(Event(s"testone$expectedDelimiter"))
probe.send(handler, Command(s"two${delimiter}thr"))
probe.expectMsg(Event(s"two$expectedDelimiter"))
Thread.sleep(1000)
probe.expectNoMsg(1.seconds)
probe.send(handler, Command(s"ee$delimiter"))
probe.expectMsg(Event(s"three$expectedDelimiter"))
if (delimiter.size > 1) {
val (first, second) = delimiter.splitAt(1)
// Test a fragmented delimiter
probe.send(handler, Command(s"four$first"))
probe.expectNoMsg(1.seconds)
probe.send(handler, Command(second))
probe.expectMsg(Event(s"four$expectedDelimiter"))
// Test cases of false match on a delimiter fragment
for (piece s"${first}five${first}$delimiter") {
probe.expectNoMsg(100.milliseconds)
probe.send(handler, Command(String.valueOf(piece)))
}
probe.expectMsg(Event(s"${first}five${first}$expectedDelimiter"))
}
probe.send(handler, Command(s"${delimiter}${delimiter}"))
probe.expectMsg(Event(expectedDelimiter))
probe.expectMsg(Event(expectedDelimiter))

View file

@ -840,42 +840,70 @@ class DelimiterFraming(maxSize: Int, delimiter: ByteString = ByteString('\n'), i
override def apply(ctx: PipelineContext) = new SymmetricPipePair[ByteString, ByteString] {
val singleByteDelimiter: Boolean = delimiter.size == 1
var buffer: ByteString = ByteString.empty
var delimiterFragment: Option[ByteString] = None
val firstByteOfDelimiter = delimiter.head
@tailrec
private def extractParts(nextChunk: ByteString, acc: List[ByteString]): List[ByteString] = {
val firstByteOfDelimiter = delimiter.head
val matchPosition = nextChunk.indexOf(firstByteOfDelimiter)
if (matchPosition == -1) {
val minSize = buffer.size + nextChunk.size
if (minSize > maxSize) throw new IllegalArgumentException(
s"Received too large frame of size $minSize (max = $maxSize)")
private def extractParts(nextChunk: ByteString, acc: List[ByteString]): List[ByteString] = delimiterFragment match {
case Some(fragment) if nextChunk.size < fragment.size && fragment.startsWith(nextChunk)
buffer ++= nextChunk
delimiterFragment = Some(fragment.drop(nextChunk.size))
acc
} else {
val missingBytes: Int = if (includeDelimiter) matchPosition + delimiter.size else matchPosition
val expectedSize = buffer.size + missingBytes
if (expectedSize > maxSize) throw new IllegalArgumentException(
s"Received frame already of size $expectedSize (max = $maxSize)")
if (singleByteDelimiter || nextChunk.slice(matchPosition, matchPosition + delimiter.size) == delimiter) {
val decoded = buffer ++ nextChunk.take(missingBytes)
buffer = ByteString.empty
extractParts(nextChunk.drop(matchPosition + delimiter.size), decoded :: acc)
// We got the missing parts of the delimiter
case Some(fragment) if nextChunk.startsWith(fragment)
val decoded = if (includeDelimiter) buffer ++ fragment else buffer.take(buffer.size - delimiter.size + fragment.size)
buffer = ByteString.empty
delimiterFragment = None
extractParts(nextChunk.drop(fragment.size), decoded :: acc)
case _
val matchPosition = nextChunk.indexOf(firstByteOfDelimiter)
if (matchPosition == -1) {
delimiterFragment = None
val minSize = buffer.size + nextChunk.size
if (minSize > maxSize) throw new IllegalArgumentException(
s"Received too large frame of size $minSize (max = $maxSize)")
buffer ++= nextChunk
acc
} else if (matchPosition + delimiter.size > nextChunk.size) {
val delimiterMatchLength = nextChunk.size - matchPosition
if (nextChunk.drop(matchPosition) == delimiter.take(delimiterMatchLength)) {
buffer ++= nextChunk
// we are expecting the other parts of the delimiter
delimiterFragment = Some(delimiter.drop(nextChunk.size - matchPosition))
acc
} else {
// false positive
delimiterFragment = None
buffer ++= nextChunk.take(matchPosition + 1)
extractParts(nextChunk.drop(matchPosition + 1), acc)
}
} else {
buffer ++= nextChunk.take(matchPosition + 1)
extractParts(nextChunk.drop(matchPosition + 1), acc)
delimiterFragment = None
val missingBytes: Int = if (includeDelimiter) matchPosition + delimiter.size else matchPosition
val expectedSize = buffer.size + missingBytes
if (expectedSize > maxSize) throw new IllegalArgumentException(
s"Received frame already of size $expectedSize (max = $maxSize)")
if (singleByteDelimiter || nextChunk.slice(matchPosition, matchPosition + delimiter.size) == delimiter) {
val decoded = buffer ++ nextChunk.take(missingBytes)
buffer = ByteString.empty
extractParts(nextChunk.drop(matchPosition + delimiter.size), decoded :: acc)
} else {
buffer ++= nextChunk.take(matchPosition + 1)
extractParts(nextChunk.drop(matchPosition + 1), acc)
}
}
}
}
override val eventPipeline = {
bs: ByteString
val parts = extractParts(bs, Nil)
buffer = buffer.compact // TODO: This should be properly benchmarked and memory profiled
parts match {
case Nil Nil
case one :: Nil ctx.singleEvent(one)
case many many reverseMap (Left(_))
case one :: Nil ctx.singleEvent(one.compact)
case many many reverseMap { frame Left(frame.compact) }
}
}