diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Framing.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Framing.scala index b92a07b1bf..3ca9e3f535 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Framing.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Framing.scala @@ -74,8 +74,20 @@ object Framing { * included in this limit. */ def simpleFramingProtocol(maximumMessageLength: Int): BidiFlow[ByteString, ByteString, ByteString, ByteString, NotUsed] = { - val decoder = lengthField(4, 0, maximumMessageLength + 4, ByteOrder.BIG_ENDIAN).map(_.drop(4)) - val encoder = Flow[ByteString].transform(() ⇒ new PushStage[ByteString, ByteString] { + BidiFlow.fromFlowsMat(simpleFramingProtocolEncoder(maximumMessageLength), simpleFramingProtocolDecoder(maximumMessageLength))(Keep.left) + } + + /** + * Protocol decoder that is used by [[Framing#simpleFramingProtocol]] + */ + def simpleFramingProtocolDecoder(maximumMessageLength: Int): Flow[ByteString, ByteString, NotUsed] = + lengthField(4, 0, maximumMessageLength + 4, ByteOrder.BIG_ENDIAN).map(_.drop(4)) + + /** + * Protocol encoder that is used by [[Framing#simpleFramingProtocol]] + */ + def simpleFramingProtocolEncoder(maximumMessageLength: Int): Flow[ByteString, ByteString, NotUsed] = + Flow[ByteString].transform(() ⇒ new PushStage[ByteString, ByteString] { override def onPush(message: ByteString, ctx: Context[ByteString]): SyncDirective = { val msgSize = message.size if (msgSize > maximumMessageLength) @@ -87,9 +99,6 @@ object Framing { } }) - BidiFlow.fromFlowsMat(encoder, decoder)(Keep.left) - } - class FramingException(msg: String) extends RuntimeException(msg) private final val bigEndianDecoder: (ByteIterator, Int) ⇒ Int = (bs, length) ⇒ {