Fail framing stage if length field is less than 0 #22367
This commit is contained in:
parent
60d918c490
commit
3f0c23e2da
2 changed files with 38 additions and 1 deletions
|
|
@ -10,13 +10,13 @@ import akka.stream._
|
|||
import akka.stream.scaladsl.Framing.FramingException
|
||||
import akka.stream.stage.{ GraphStage, _ }
|
||||
import akka.stream.testkit.{ StreamSpec, TestPublisher, TestSubscriber }
|
||||
import akka.testkit.LongRunningTest
|
||||
import akka.util.{ ByteString, ByteStringBuilder }
|
||||
import org.scalatest.concurrent.PatienceConfiguration.Timeout
|
||||
|
||||
import scala.collection.immutable
|
||||
import scala.concurrent.duration._
|
||||
import scala.util.Random
|
||||
import akka.testkit.LongRunningTest
|
||||
|
||||
class FramingSpec extends StreamSpec {
|
||||
|
||||
|
|
@ -265,6 +265,41 @@ class FramingSpec extends StreamSpec {
|
|||
.futureValue should ===(testMessages)
|
||||
}
|
||||
|
||||
"fail the stage on negative length field values (#22367)" in {
|
||||
implicit val bo = java.nio.ByteOrder.LITTLE_ENDIAN
|
||||
|
||||
// A 4-byte message containing only an Int specifying the length of the payload
|
||||
// The issue shows itself if length in message is less than or equal
|
||||
// to -4 (if expected length field is length 4)
|
||||
val bs = ByteString.newBuilder.putInt(-4).result()
|
||||
|
||||
val res =
|
||||
Source
|
||||
.single(bs)
|
||||
.via(Flow[ByteString].via(Framing.lengthField(4, 0, 1000)))
|
||||
.runWith(Sink.seq)
|
||||
|
||||
val ex = res.failed.futureValue
|
||||
ex shouldBe a[FramingException]
|
||||
ex.getMessage should ===("Decoded frame header reported negative size -4")
|
||||
}
|
||||
|
||||
"let zero length field values pass through (#22367)" in {
|
||||
implicit val bo = java.nio.ByteOrder.LITTLE_ENDIAN
|
||||
|
||||
// Interleave empty frames with a frame with data
|
||||
val encodedPayload = encode(ByteString(42), 0, 4, bo)
|
||||
val emptyFrame = encode(ByteString(), 0, 4, bo)
|
||||
val bs = Vector(emptyFrame, encodedPayload, emptyFrame)
|
||||
|
||||
val res =
|
||||
Source(bs)
|
||||
.via(Flow[ByteString].via(Framing.lengthField(4, 0, 1000)))
|
||||
.runWith(Sink.seq)
|
||||
|
||||
res.futureValue should equal(Seq(emptyFrame, encodedPayload, emptyFrame))
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -266,6 +266,8 @@ object Framing {
|
|||
frameSize = parsedLength + minimumChunkSize
|
||||
if (frameSize > maximumFrameLength) {
|
||||
failStage(new FramingException(s"Maximum allowed frame size is $maximumFrameLength but decoded frame header reported size $frameSize"))
|
||||
} else if (parsedLength < 0) {
|
||||
failStage(new FramingException(s"Decoded frame header reported negative size $parsedLength"))
|
||||
} else if (buffSize >= frameSize) {
|
||||
pushFrame()
|
||||
} else tryPull()
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue