=htc fix JsonFraming completion bug, exposed by new Interpreter

This commit is contained in:
Konrad Malawski 2016-08-02 15:56:39 +02:00
parent 3c2d021742
commit 8a1f8f27dc
4 changed files with 10 additions and 15 deletions

View file

@ -33,7 +33,8 @@ class JsonFramingBenchmark {
|{"fname":"Bob","name":"Smith","age":42,"id":1337,"boardMember":false},
|{"fname":"Bob","name":"Smith","age":42,"id":1337,"boardMember":false},
|{"fname":"Bob","name":"Smith","age":42,"id":1337,"boardMember":false},
|{"fname":"Hank","name":"Smith","age":42,"id":1337,"boardMember":false}""".stripMargin)
|{"fname":"Hank","name":"Smith","age":42,"id":1337,"boardMember":false}""".stripMargin
)
val bracket = new JsonObjectParser

View file

@ -47,22 +47,14 @@ trait FramedEntityStreamingDirectives extends MarshallingDirectives {
* Extracts entity as [[Source]] of elements of type `T`.
* This is achieved by applying the implicitly provided (in the following order):
*
* - 1st: [[FramingFlow]] in order to chunk-up the incoming [[ByteString]]s according to the
* `Content-Type` aware framing (for example, [[common.EntityStreamingSupport.bracketCountingJsonFraming]]).
* - 2nd: [[Unmarshaller]] (from [[ByteString]] to `T`) for each of the respective "chunks" (e.g. for each JSON element contained within an array).
* - 1st: chunk-up the incoming [[ByteString]]s by applying the `Content-Type`-aware framing
* - 2nd: apply the [[Unmarshaller]] (from [[ByteString]] to `T`) for each of the respective "chunks" (e.g. for each JSON element contained within an array).
*
* The request will be rejected with an [[akka.http.scaladsl.server.UnsupportedRequestContentTypeRejection]] if
* its [[ContentType]] is not supported by the used `framing` or `unmarshaller`.
*
* It is recommended to use the [[common.EntityStreamingSupport]] trait in conjunction with this
* directive as it helps provide the right [[FramingFlow]] and [[SourceRenderingMode]] for the most
* typical usage scenarios (JSON, CSV, ...).
*
* Cancelling extracted [[Source]] closes the connection abruptly (same as cancelling the `entity.dataBytes`).
*
* If looking to improve marshalling performance in face of many elements (possibly of different sizes),
* you may be interested in using [[asSourceOfAsyncUnordered]] instead.
*
* See also [[MiscDirectives.withoutSizeLimit]] as you may want to allow streaming infinite streams of data in this route.
* By default the uploaded data is limited by the `akka.http.parsing.max-content-length`.
*/

View file

@ -16,8 +16,6 @@ import scala.concurrent.duration._
class JsonFramingSpec extends AkkaSpec {
override implicit val patience = PatienceConfig(timeout = 10.seconds)
implicit val mat = ActorMaterializer()
"collecting multiple json" should {

View file

@ -56,8 +56,12 @@ object JsonFraming {
override def onPull(): Unit =
tryPopBuffer()
override def onUpstreamFinish(): Unit =
if (buffer.isEmpty) completeStage()
override def onUpstreamFinish(): Unit = {
try buffer.poll() match {
case Some(json) emit(out, json, () completeStage())
case _ completeStage()
}
}
def tryPopBuffer() = {
try buffer.poll() match {