From 3f0c23e2dae65242d70c0805085ca6eda9f9a23d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johan=20Andr=C3=A9n?= Date: Wed, 15 Mar 2017 11:13:13 +0100 Subject: [PATCH] Fail framing stage if length field is less than 0 #22367 --- .../akka/stream/scaladsl/FramingSpec.scala | 37 ++++++++++++++++++- .../scala/akka/stream/scaladsl/Framing.scala | 2 + 2 files changed, 38 insertions(+), 1 deletion(-) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FramingSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FramingSpec.scala index 72a773ad38..914ce4c91a 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FramingSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FramingSpec.scala @@ -10,13 +10,13 @@ import akka.stream._ import akka.stream.scaladsl.Framing.FramingException import akka.stream.stage.{ GraphStage, _ } import akka.stream.testkit.{ StreamSpec, TestPublisher, TestSubscriber } +import akka.testkit.LongRunningTest import akka.util.{ ByteString, ByteStringBuilder } import org.scalatest.concurrent.PatienceConfiguration.Timeout import scala.collection.immutable import scala.concurrent.duration._ import scala.util.Random -import akka.testkit.LongRunningTest class FramingSpec extends StreamSpec { @@ -265,6 +265,41 @@ class FramingSpec extends StreamSpec { .futureValue should ===(testMessages) } + "fail the stage on negative length field values (#22367)" in { + implicit val bo = java.nio.ByteOrder.LITTLE_ENDIAN + + // A 4-byte message containing only an Int specifying the length of the payload + // The issue shows itself if length in message is less than or equal + // to -4 (if expected length field is length 4) + val bs = ByteString.newBuilder.putInt(-4).result() + + val res = + Source + .single(bs) + .via(Flow[ByteString].via(Framing.lengthField(4, 0, 1000))) + .runWith(Sink.seq) + + val ex = res.failed.futureValue + ex shouldBe a[FramingException] + ex.getMessage should ===("Decoded frame header reported negative size -4") + } + + "let zero length field values pass through (#22367)" in { + implicit val bo = java.nio.ByteOrder.LITTLE_ENDIAN + + // Interleave empty frames with a frame with data + val encodedPayload = encode(ByteString(42), 0, 4, bo) + val emptyFrame = encode(ByteString(), 0, 4, bo) + val bs = Vector(emptyFrame, encodedPayload, emptyFrame) + + val res = + Source(bs) + .via(Flow[ByteString].via(Framing.lengthField(4, 0, 1000))) + .runWith(Sink.seq) + + res.futureValue should equal(Seq(emptyFrame, encodedPayload, emptyFrame)) + + } } } diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Framing.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Framing.scala index 87fb7b6296..00aab0b6b6 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Framing.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Framing.scala @@ -266,6 +266,8 @@ object Framing { frameSize = parsedLength + minimumChunkSize if (frameSize > maximumFrameLength) { failStage(new FramingException(s"Maximum allowed frame size is $maximumFrameLength but decoded frame header reported size $frameSize")) + } else if (parsedLength < 0) { + failStage(new FramingException(s"Decoded frame header reported negative size $parsedLength")) } else if (buffSize >= frameSize) { pushFrame() } else tryPull()