=htp framed entity streaming cleanup, renames
This commit is contained in:
parent
c3308149be
commit
f2419f5a08
9 changed files with 37 additions and 37 deletions
|
|
@ -91,7 +91,7 @@ object ConsistentHashingRouter {
|
||||||
* INTERNAL API
|
* INTERNAL API
|
||||||
*/
|
*/
|
||||||
private[akka] def hashMappingAdapter(mapper: ConsistentHashMapper): ConsistentHashMapping = {
|
private[akka] def hashMappingAdapter(mapper: ConsistentHashMapper): ConsistentHashMapping = {
|
||||||
case message if (mapper.hashKey(message).asInstanceOf[AnyRef] ne null) ⇒
|
case message if mapper.hashKey(message).asInstanceOf[AnyRef] ne null ⇒
|
||||||
mapper.hashKey(message)
|
mapper.hashKey(message)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -5,7 +5,7 @@ package akka.stream
|
||||||
|
|
||||||
import java.util.concurrent.TimeUnit
|
import java.util.concurrent.TimeUnit
|
||||||
|
|
||||||
import akka.stream.impl.JsonBracketCounting
|
import akka.stream.impl.JsonObjectParser
|
||||||
import akka.util.ByteString
|
import akka.util.ByteString
|
||||||
import org.openjdk.jmh.annotations._
|
import org.openjdk.jmh.annotations._
|
||||||
|
|
||||||
|
|
@ -35,7 +35,7 @@ class JsonFramingBenchmark {
|
||||||
|{"fname":"Bob","name":"Smith","age":42,"id":1337,"boardMember":false},
|
|{"fname":"Bob","name":"Smith","age":42,"id":1337,"boardMember":false},
|
||||||
|{"fname":"Hank","name":"Smith","age":42,"id":1337,"boardMember":false}""".stripMargin)
|
|{"fname":"Hank","name":"Smith","age":42,"id":1337,"boardMember":false}""".stripMargin)
|
||||||
|
|
||||||
val bracket = new JsonBracketCounting
|
val bracket = new JsonObjectParser
|
||||||
|
|
||||||
@Setup(Level.Invocation)
|
@Setup(Level.Invocation)
|
||||||
def init(): Unit = {
|
def init(): Unit = {
|
||||||
|
|
|
||||||
|
|
@ -1,9 +0,0 @@
|
||||||
/*
|
|
||||||
* Copyright (C) 2009-2015 Typesafe Inc. <http://www.typesafe.com>
|
|
||||||
*/
|
|
||||||
package akka.http.javadsl.server
|
|
||||||
|
|
||||||
class JsonEntityStreaming {
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
@ -61,7 +61,8 @@ object StrictForm {
|
||||||
fsu(value.entity.data.decodeString(charsetName))
|
fsu(value.entity.data.decodeString(charsetName))
|
||||||
})
|
})
|
||||||
|
|
||||||
@implicitNotFound(s"In order to unmarshal a `StrictForm.Field` to type `$${T}` you need to supply a " +
|
@implicitNotFound(msg =
|
||||||
|
s"In order to unmarshal a `StrictForm.Field` to type `$${T}` you need to supply a " +
|
||||||
s"`FromStringUnmarshaller[$${T}]` and/or a `FromEntityUnmarshaller[$${T}]`")
|
s"`FromStringUnmarshaller[$${T}]` and/or a `FromEntityUnmarshaller[$${T}]`")
|
||||||
sealed trait FieldUnmarshaller[T] {
|
sealed trait FieldUnmarshaller[T] {
|
||||||
def unmarshalString(value: String)(implicit ec: ExecutionContext, mat: Materializer): Future[T]
|
def unmarshalString(value: String)(implicit ec: ExecutionContext, mat: Materializer): Future[T]
|
||||||
|
|
|
||||||
|
|
@ -6,9 +6,15 @@ package akka.http.scaladsl.unmarshalling
|
||||||
|
|
||||||
import scala.collection.immutable
|
import scala.collection.immutable
|
||||||
import akka.http.scaladsl.util.FastFuture
|
import akka.http.scaladsl.util.FastFuture
|
||||||
|
import akka.util.ByteString
|
||||||
|
|
||||||
trait PredefinedFromStringUnmarshallers {
|
trait PredefinedFromStringUnmarshallers {
|
||||||
|
|
||||||
|
implicit def _fromStringUnmarshallerFromByteStringUnmarshaller[T](implicit bsum: FromByteStringUnmarshaller[T]): Unmarshaller[String, T] = {
|
||||||
|
val bs = Unmarshaller.strict[String, ByteString](s ⇒ ByteString(s))
|
||||||
|
bs.flatMap(implicit ec ⇒ implicit mat ⇒ bsum(_))
|
||||||
|
}
|
||||||
|
|
||||||
implicit val byteFromStringUnmarshaller: Unmarshaller[String, Byte] =
|
implicit val byteFromStringUnmarshaller: Unmarshaller[String, Byte] =
|
||||||
numberUnmarshaller(_.toByte, "8-bit signed integer")
|
numberUnmarshaller(_.toByte, "8-bit signed integer")
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -6,6 +6,7 @@ package akka.http.scaladsl
|
||||||
|
|
||||||
import akka.http.scaladsl.common.StrictForm
|
import akka.http.scaladsl.common.StrictForm
|
||||||
import akka.http.scaladsl.model._
|
import akka.http.scaladsl.model._
|
||||||
|
import akka.util.ByteString
|
||||||
|
|
||||||
package object unmarshalling {
|
package object unmarshalling {
|
||||||
//# unmarshaller-aliases
|
//# unmarshaller-aliases
|
||||||
|
|
@ -13,6 +14,7 @@ package object unmarshalling {
|
||||||
type FromMessageUnmarshaller[T] = Unmarshaller[HttpMessage, T]
|
type FromMessageUnmarshaller[T] = Unmarshaller[HttpMessage, T]
|
||||||
type FromResponseUnmarshaller[T] = Unmarshaller[HttpResponse, T]
|
type FromResponseUnmarshaller[T] = Unmarshaller[HttpResponse, T]
|
||||||
type FromRequestUnmarshaller[T] = Unmarshaller[HttpRequest, T]
|
type FromRequestUnmarshaller[T] = Unmarshaller[HttpRequest, T]
|
||||||
|
type FromByteStringUnmarshaller[T] = Unmarshaller[ByteString, T]
|
||||||
type FromStringUnmarshaller[T] = Unmarshaller[String, T]
|
type FromStringUnmarshaller[T] = Unmarshaller[String, T]
|
||||||
type FromStrictFormFieldUnmarshaller[T] = Unmarshaller[StrictForm.Field, T]
|
type FromStrictFormFieldUnmarshaller[T] = Unmarshaller[StrictForm.Field, T]
|
||||||
//#
|
//#
|
||||||
|
|
|
||||||
|
|
@ -4,7 +4,7 @@
|
||||||
package akka.stream.scaladsl
|
package akka.stream.scaladsl
|
||||||
|
|
||||||
import akka.stream.ActorMaterializer
|
import akka.stream.ActorMaterializer
|
||||||
import akka.stream.impl.JsonBracketCounting
|
import akka.stream.impl.JsonObjectParser
|
||||||
import akka.stream.scaladsl.Framing.FramingException
|
import akka.stream.scaladsl.Framing.FramingException
|
||||||
import akka.stream.scaladsl.{ JsonFraming, Framing, Source }
|
import akka.stream.scaladsl.{ JsonFraming, Framing, Source }
|
||||||
import akka.stream.testkit.scaladsl.TestSink
|
import akka.stream.testkit.scaladsl.TestSink
|
||||||
|
|
@ -124,7 +124,7 @@ class JsonFramingSpec extends AkkaSpec {
|
||||||
"collecting json buffer" when {
|
"collecting json buffer" when {
|
||||||
"nothing is supplied" should {
|
"nothing is supplied" should {
|
||||||
"return nothing" in {
|
"return nothing" in {
|
||||||
val buffer = new JsonBracketCounting()
|
val buffer = new JsonObjectParser()
|
||||||
buffer.poll() should ===(None)
|
buffer.poll() should ===(None)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -132,25 +132,25 @@ class JsonFramingSpec extends AkkaSpec {
|
||||||
"valid json is supplied" which {
|
"valid json is supplied" which {
|
||||||
"has one object" should {
|
"has one object" should {
|
||||||
"successfully parse empty object" in {
|
"successfully parse empty object" in {
|
||||||
val buffer = new JsonBracketCounting()
|
val buffer = new JsonObjectParser()
|
||||||
buffer.offer(ByteString("""{}"""))
|
buffer.offer(ByteString("""{}"""))
|
||||||
buffer.poll().get.utf8String shouldBe """{}"""
|
buffer.poll().get.utf8String shouldBe """{}"""
|
||||||
}
|
}
|
||||||
|
|
||||||
"successfully parse single field having string value" in {
|
"successfully parse single field having string value" in {
|
||||||
val buffer = new JsonBracketCounting()
|
val buffer = new JsonObjectParser()
|
||||||
buffer.offer(ByteString("""{ "name": "john"}"""))
|
buffer.offer(ByteString("""{ "name": "john"}"""))
|
||||||
buffer.poll().get.utf8String shouldBe """{ "name": "john"}"""
|
buffer.poll().get.utf8String shouldBe """{ "name": "john"}"""
|
||||||
}
|
}
|
||||||
|
|
||||||
"successfully parse single field having string value containing space" in {
|
"successfully parse single field having string value containing space" in {
|
||||||
val buffer = new JsonBracketCounting()
|
val buffer = new JsonObjectParser()
|
||||||
buffer.offer(ByteString("""{ "name": "john doe"}"""))
|
buffer.offer(ByteString("""{ "name": "john doe"}"""))
|
||||||
buffer.poll().get.utf8String shouldBe """{ "name": "john doe"}"""
|
buffer.poll().get.utf8String shouldBe """{ "name": "john doe"}"""
|
||||||
}
|
}
|
||||||
|
|
||||||
"successfully parse single field having string value containing curly brace" in {
|
"successfully parse single field having string value containing curly brace" in {
|
||||||
val buffer = new JsonBracketCounting()
|
val buffer = new JsonObjectParser()
|
||||||
|
|
||||||
buffer.offer(ByteString("""{ "name": "john{"""))
|
buffer.offer(ByteString("""{ "name": "john{"""))
|
||||||
buffer.offer(ByteString("}"))
|
buffer.offer(ByteString("}"))
|
||||||
|
|
@ -161,7 +161,7 @@ class JsonFramingSpec extends AkkaSpec {
|
||||||
}
|
}
|
||||||
|
|
||||||
"successfully parse single field having string value containing curly brace and escape character" in {
|
"successfully parse single field having string value containing curly brace and escape character" in {
|
||||||
val buffer = new JsonBracketCounting()
|
val buffer = new JsonObjectParser()
|
||||||
|
|
||||||
buffer.offer(ByteString("""{ "name": "john"""))
|
buffer.offer(ByteString("""{ "name": "john"""))
|
||||||
buffer.offer(ByteString("\\\""))
|
buffer.offer(ByteString("\\\""))
|
||||||
|
|
@ -177,19 +177,19 @@ class JsonFramingSpec extends AkkaSpec {
|
||||||
}
|
}
|
||||||
|
|
||||||
"successfully parse single field having integer value" in {
|
"successfully parse single field having integer value" in {
|
||||||
val buffer = new JsonBracketCounting()
|
val buffer = new JsonObjectParser()
|
||||||
buffer.offer(ByteString("""{ "age": 101}"""))
|
buffer.offer(ByteString("""{ "age": 101}"""))
|
||||||
buffer.poll().get.utf8String shouldBe """{ "age": 101}"""
|
buffer.poll().get.utf8String shouldBe """{ "age": 101}"""
|
||||||
}
|
}
|
||||||
|
|
||||||
"successfully parse single field having decimal value" in {
|
"successfully parse single field having decimal value" in {
|
||||||
val buffer = new JsonBracketCounting()
|
val buffer = new JsonObjectParser()
|
||||||
buffer.offer(ByteString("""{ "age": 101}"""))
|
buffer.offer(ByteString("""{ "age": 101}"""))
|
||||||
buffer.poll().get.utf8String shouldBe """{ "age": 101}"""
|
buffer.poll().get.utf8String shouldBe """{ "age": 101}"""
|
||||||
}
|
}
|
||||||
|
|
||||||
"successfully parse single field having nested object" in {
|
"successfully parse single field having nested object" in {
|
||||||
val buffer = new JsonBracketCounting()
|
val buffer = new JsonObjectParser()
|
||||||
buffer.offer(ByteString(
|
buffer.offer(ByteString(
|
||||||
"""
|
"""
|
||||||
|{ "name": "john",
|
|{ "name": "john",
|
||||||
|
|
@ -210,7 +210,7 @@ class JsonFramingSpec extends AkkaSpec {
|
||||||
}
|
}
|
||||||
|
|
||||||
"successfully parse single field having multiple level of nested object" in {
|
"successfully parse single field having multiple level of nested object" in {
|
||||||
val buffer = new JsonBracketCounting()
|
val buffer = new JsonObjectParser()
|
||||||
buffer.offer(ByteString(
|
buffer.offer(ByteString(
|
||||||
"""
|
"""
|
||||||
|{ "name": "john",
|
|{ "name": "john",
|
||||||
|
|
@ -239,7 +239,7 @@ class JsonFramingSpec extends AkkaSpec {
|
||||||
|
|
||||||
"has nested array" should {
|
"has nested array" should {
|
||||||
"successfully parse" in {
|
"successfully parse" in {
|
||||||
val buffer = new JsonBracketCounting()
|
val buffer = new JsonObjectParser()
|
||||||
buffer.offer(ByteString(
|
buffer.offer(ByteString(
|
||||||
"""
|
"""
|
||||||
|{ "name": "john",
|
|{ "name": "john",
|
||||||
|
|
@ -264,7 +264,7 @@ class JsonFramingSpec extends AkkaSpec {
|
||||||
|
|
||||||
"has complex object graph" should {
|
"has complex object graph" should {
|
||||||
"successfully parse" in {
|
"successfully parse" in {
|
||||||
val buffer = new JsonBracketCounting()
|
val buffer = new JsonObjectParser()
|
||||||
buffer.offer(ByteString(
|
buffer.offer(ByteString(
|
||||||
"""
|
"""
|
||||||
|{
|
|{
|
||||||
|
|
@ -318,13 +318,13 @@ class JsonFramingSpec extends AkkaSpec {
|
||||||
|
|
||||||
"has multiple fields" should {
|
"has multiple fields" should {
|
||||||
"parse successfully" in {
|
"parse successfully" in {
|
||||||
val buffer = new JsonBracketCounting()
|
val buffer = new JsonObjectParser()
|
||||||
buffer.offer(ByteString("""{ "name": "john", "age": 101}"""))
|
buffer.offer(ByteString("""{ "name": "john", "age": 101}"""))
|
||||||
buffer.poll().get.utf8String shouldBe """{ "name": "john", "age": 101}"""
|
buffer.poll().get.utf8String shouldBe """{ "name": "john", "age": 101}"""
|
||||||
}
|
}
|
||||||
|
|
||||||
"parse successfully despite valid whitespaces around json" in {
|
"parse successfully despite valid whitespaces around json" in {
|
||||||
val buffer = new JsonBracketCounting()
|
val buffer = new JsonObjectParser()
|
||||||
buffer.offer(ByteString(
|
buffer.offer(ByteString(
|
||||||
"""
|
"""
|
||||||
|
|
|
|
||||||
|
|
@ -351,7 +351,7 @@ class JsonFramingSpec extends AkkaSpec {
|
||||||
| }
|
| }
|
||||||
""".stripMargin
|
""".stripMargin
|
||||||
|
|
||||||
val buffer = new JsonBracketCounting()
|
val buffer = new JsonObjectParser()
|
||||||
buffer.offer(ByteString(input))
|
buffer.offer(ByteString(input))
|
||||||
|
|
||||||
buffer.poll().get.utf8String shouldBe
|
buffer.poll().get.utf8String shouldBe
|
||||||
|
|
@ -375,7 +375,7 @@ class JsonFramingSpec extends AkkaSpec {
|
||||||
}
|
}
|
||||||
|
|
||||||
"returns none until valid json is encountered" in {
|
"returns none until valid json is encountered" in {
|
||||||
val buffer = new JsonBracketCounting()
|
val buffer = new JsonObjectParser()
|
||||||
|
|
||||||
"""{ "name": "john"""".foreach {
|
"""{ "name": "john"""".foreach {
|
||||||
c ⇒
|
c ⇒
|
||||||
|
|
@ -389,13 +389,13 @@ class JsonFramingSpec extends AkkaSpec {
|
||||||
|
|
||||||
"invalid json is supplied" should {
|
"invalid json is supplied" should {
|
||||||
"fail if it's broken from the start" in {
|
"fail if it's broken from the start" in {
|
||||||
val buffer = new JsonBracketCounting()
|
val buffer = new JsonObjectParser()
|
||||||
buffer.offer(ByteString("""THIS IS NOT VALID { "name": "john"}"""))
|
buffer.offer(ByteString("""THIS IS NOT VALID { "name": "john"}"""))
|
||||||
a[FramingException] shouldBe thrownBy { buffer.poll() }
|
a[FramingException] shouldBe thrownBy { buffer.poll() }
|
||||||
}
|
}
|
||||||
|
|
||||||
"fail if it's broken at the end" in {
|
"fail if it's broken at the end" in {
|
||||||
val buffer = new JsonBracketCounting()
|
val buffer = new JsonObjectParser()
|
||||||
buffer.offer(ByteString("""{ "name": "john"} THIS IS NOT VALID"""))
|
buffer.offer(ByteString("""{ "name": "john"} THIS IS NOT VALID"""))
|
||||||
buffer.poll() // first emitting the valid element
|
buffer.poll() // first emitting the valid element
|
||||||
a[FramingException] shouldBe thrownBy { buffer.poll() }
|
a[FramingException] shouldBe thrownBy { buffer.poll() }
|
||||||
|
|
|
||||||
|
|
@ -11,7 +11,7 @@ import scala.annotation.switch
|
||||||
/**
|
/**
|
||||||
* INTERNAL API: Use [[akka.stream.scaladsl.JsonFraming]] instead.
|
* INTERNAL API: Use [[akka.stream.scaladsl.JsonFraming]] instead.
|
||||||
*/
|
*/
|
||||||
private[akka] object JsonBracketCounting {
|
private[akka] object JsonObjectParser {
|
||||||
|
|
||||||
final val SquareBraceStart = '['.toByte
|
final val SquareBraceStart = '['.toByte
|
||||||
final val SquareBraceEnd = ']'.toByte
|
final val SquareBraceEnd = ']'.toByte
|
||||||
|
|
@ -42,8 +42,8 @@ private[akka] object JsonBracketCounting {
|
||||||
*
|
*
|
||||||
* Leading whitespace between elements will be trimmed.
|
* Leading whitespace between elements will be trimmed.
|
||||||
*/
|
*/
|
||||||
private[akka] class JsonBracketCounting(maximumObjectLength: Int = Int.MaxValue) {
|
private[akka] class JsonObjectParser(maximumObjectLength: Int = Int.MaxValue) {
|
||||||
import JsonBracketCounting._
|
import JsonObjectParser._
|
||||||
|
|
||||||
private var buffer: ByteString = ByteString.empty
|
private var buffer: ByteString = ByteString.empty
|
||||||
|
|
||||||
|
|
@ -5,7 +5,7 @@ package akka.stream.scaladsl
|
||||||
|
|
||||||
import akka.NotUsed
|
import akka.NotUsed
|
||||||
import akka.stream.Attributes
|
import akka.stream.Attributes
|
||||||
import akka.stream.impl.JsonBracketCounting
|
import akka.stream.impl.JsonObjectParser
|
||||||
import akka.stream.impl.fusing.GraphStages.SimpleLinearGraphStage
|
import akka.stream.impl.fusing.GraphStages.SimpleLinearGraphStage
|
||||||
import akka.stream.stage.{ InHandler, OutHandler, GraphStageLogic }
|
import akka.stream.stage.{ InHandler, OutHandler, GraphStageLogic }
|
||||||
import akka.util.ByteString
|
import akka.util.ByteString
|
||||||
|
|
@ -42,7 +42,7 @@ object JsonFraming {
|
||||||
*/
|
*/
|
||||||
def bracketCounting(maximumObjectLength: Int): Flow[ByteString, ByteString, NotUsed] =
|
def bracketCounting(maximumObjectLength: Int): Flow[ByteString, ByteString, NotUsed] =
|
||||||
Flow[ByteString].via(new SimpleLinearGraphStage[ByteString] {
|
Flow[ByteString].via(new SimpleLinearGraphStage[ByteString] {
|
||||||
private[this] val buffer = new JsonBracketCounting(maximumObjectLength)
|
private[this] val buffer = new JsonObjectParser(maximumObjectLength)
|
||||||
|
|
||||||
override def createLogic(inheritedAttributes: Attributes) = new GraphStageLogic(shape) with InHandler with OutHandler {
|
override def createLogic(inheritedAttributes: Attributes) = new GraphStageLogic(shape) with InHandler with OutHandler {
|
||||||
setHandlers(in, out, this)
|
setHandlers(in, out, this)
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue