diff --git a/akka-actor-tests/src/test/scala/akka/io/DelimiterFramingSpec.scala b/akka-actor-tests/src/test/scala/akka/io/DelimiterFramingSpec.scala index b890b0ee84..1147dd96de 100644 --- a/akka-actor-tests/src/test/scala/akka/io/DelimiterFramingSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/io/DelimiterFramingSpec.scala @@ -9,6 +9,7 @@ import akka.util.ByteString import akka.actor.{ Props, ActorLogging, Actor, ActorContext } import akka.TestUtils import java.util.concurrent.atomic.AtomicInteger +import scala.concurrent.duration._ class DelimiterFramingSpec extends AkkaSpec { @@ -77,9 +78,26 @@ class DelimiterFramingSpec extends AkkaSpec { probe.expectMsg(Event(s"testone$expectedDelimiter")) probe.send(handler, Command(s"two${delimiter}thr")) probe.expectMsg(Event(s"two$expectedDelimiter")) - Thread.sleep(1000) + probe.expectNoMsg(1.seconds) probe.send(handler, Command(s"ee$delimiter")) probe.expectMsg(Event(s"three$expectedDelimiter")) + if (delimiter.size > 1) { + val (first, second) = delimiter.splitAt(1) + + // Test a fragmented delimiter + probe.send(handler, Command(s"four$first")) + probe.expectNoMsg(1.seconds) + probe.send(handler, Command(second)) + probe.expectMsg(Event(s"four$expectedDelimiter")) + + // Test cases of false match on a delimiter fragment + for (piece ← s"${first}five${first}$delimiter") { + probe.expectNoMsg(100.milliseconds) + probe.send(handler, Command(String.valueOf(piece))) + } + probe.expectMsg(Event(s"${first}five${first}$expectedDelimiter")) + + } probe.send(handler, Command(s"${delimiter}${delimiter}")) probe.expectMsg(Event(expectedDelimiter)) probe.expectMsg(Event(expectedDelimiter)) diff --git a/akka-actor/src/main/scala/akka/io/Pipelines.scala b/akka-actor/src/main/scala/akka/io/Pipelines.scala index a473ddfb84..187815c183 100644 --- a/akka-actor/src/main/scala/akka/io/Pipelines.scala +++ b/akka-actor/src/main/scala/akka/io/Pipelines.scala @@ -840,42 +840,70 @@ class DelimiterFraming(maxSize: Int, delimiter: ByteString = ByteString('\n'), i override def apply(ctx: PipelineContext) = new SymmetricPipePair[ByteString, ByteString] { val singleByteDelimiter: Boolean = delimiter.size == 1 var buffer: ByteString = ByteString.empty + var delimiterFragment: Option[ByteString] = None + val firstByteOfDelimiter = delimiter.head @tailrec - private def extractParts(nextChunk: ByteString, acc: List[ByteString]): List[ByteString] = { - val firstByteOfDelimiter = delimiter.head - val matchPosition = nextChunk.indexOf(firstByteOfDelimiter) - if (matchPosition == -1) { - val minSize = buffer.size + nextChunk.size - if (minSize > maxSize) throw new IllegalArgumentException( - s"Received too large frame of size $minSize (max = $maxSize)") + private def extractParts(nextChunk: ByteString, acc: List[ByteString]): List[ByteString] = delimiterFragment match { + case Some(fragment) if nextChunk.size < fragment.size && fragment.startsWith(nextChunk) ⇒ buffer ++= nextChunk + delimiterFragment = Some(fragment.drop(nextChunk.size)) acc - } else { - val missingBytes: Int = if (includeDelimiter) matchPosition + delimiter.size else matchPosition - val expectedSize = buffer.size + missingBytes - if (expectedSize > maxSize) throw new IllegalArgumentException( - s"Received frame already of size $expectedSize (max = $maxSize)") - - if (singleByteDelimiter || nextChunk.slice(matchPosition, matchPosition + delimiter.size) == delimiter) { - val decoded = buffer ++ nextChunk.take(missingBytes) - buffer = ByteString.empty - extractParts(nextChunk.drop(matchPosition + delimiter.size), decoded :: acc) + // We got the missing parts of the delimiter + case Some(fragment) if nextChunk.startsWith(fragment) ⇒ + val decoded = if (includeDelimiter) buffer ++ fragment else buffer.take(buffer.size - delimiter.size + fragment.size) + buffer = ByteString.empty + delimiterFragment = None + extractParts(nextChunk.drop(fragment.size), decoded :: acc) + case _ ⇒ + val matchPosition = nextChunk.indexOf(firstByteOfDelimiter) + if (matchPosition == -1) { + delimiterFragment = None + val minSize = buffer.size + nextChunk.size + if (minSize > maxSize) throw new IllegalArgumentException( + s"Received too large frame of size $minSize (max = $maxSize)") + buffer ++= nextChunk + acc + } else if (matchPosition + delimiter.size > nextChunk.size) { + val delimiterMatchLength = nextChunk.size - matchPosition + if (nextChunk.drop(matchPosition) == delimiter.take(delimiterMatchLength)) { + buffer ++= nextChunk + // we are expecting the other parts of the delimiter + delimiterFragment = Some(delimiter.drop(nextChunk.size - matchPosition)) + acc + } else { + // false positive + delimiterFragment = None + buffer ++= nextChunk.take(matchPosition + 1) + extractParts(nextChunk.drop(matchPosition + 1), acc) + } } else { - buffer ++= nextChunk.take(matchPosition + 1) - extractParts(nextChunk.drop(matchPosition + 1), acc) + delimiterFragment = None + val missingBytes: Int = if (includeDelimiter) matchPosition + delimiter.size else matchPosition + val expectedSize = buffer.size + missingBytes + if (expectedSize > maxSize) throw new IllegalArgumentException( + s"Received frame already of size $expectedSize (max = $maxSize)") + + if (singleByteDelimiter || nextChunk.slice(matchPosition, matchPosition + delimiter.size) == delimiter) { + val decoded = buffer ++ nextChunk.take(missingBytes) + buffer = ByteString.empty + extractParts(nextChunk.drop(matchPosition + delimiter.size), decoded :: acc) + } else { + buffer ++= nextChunk.take(matchPosition + 1) + extractParts(nextChunk.drop(matchPosition + 1), acc) + } } - } } override val eventPipeline = { bs: ByteString ⇒ val parts = extractParts(bs, Nil) + buffer = buffer.compact // TODO: This should be properly benchmarked and memory profiled parts match { case Nil ⇒ Nil - case one :: Nil ⇒ ctx.singleEvent(one) - case many ⇒ many reverseMap (Left(_)) + case one :: Nil ⇒ ctx.singleEvent(one.compact) + case many ⇒ many reverseMap { frame ⇒ Left(frame.compact) } } }