=str #19834 Replave UTF8Encoder with GraphStage

This commit is contained in:
Alexander Golubev 2016-03-18 09:32:42 -04:00
parent 1596ae7f13
commit bcd52fb90a
4 changed files with 72 additions and 101 deletions

View file

@ -32,7 +32,7 @@ private[http] object MessageToFrameRenderer {
case BinaryMessage.Strict(data) strictFrames(Opcode.Binary, data)
case bm: BinaryMessage streamedFrames(Opcode.Binary, bm.dataStream)
case TextMessage.Strict(text) strictFrames(Opcode.Text, ByteString(text, "UTF-8"))
case tm: TextMessage streamedFrames(Opcode.Text, tm.textStream.transform(() new Utf8Encoder))
case tm: TextMessage streamedFrames(Opcode.Text, tm.textStream.via(Utf8Encoder))
}
}
}

View file

@ -4,6 +4,7 @@
package akka.http.impl.engine.ws
import akka.stream.{ Outlet, Inlet, FlowShape, Attributes }
import akka.stream.stage._
import akka.util.{ ByteStringBuilder, ByteString }
@ -12,13 +13,26 @@ import akka.util.{ ByteStringBuilder, ByteString }
*
* INTERNAL API
*/
private[http] class Utf8Encoder extends PushStage[String, ByteString] {
import Utf8Encoder._
private[http] object Utf8Encoder extends GraphStage[FlowShape[String, ByteString]] {
val SurrogateFirst = 0xd800
val SurrogateSecond = 0xdc00
val Utf8OneByteLimit = lowerNBitsSet(7)
val Utf8TwoByteLimit = lowerNBitsSet(11)
val Utf8ThreeByteLimit = lowerNBitsSet(16)
def lowerNBitsSet(n: Int): Long = (1L << n) - 1
val in = Inlet[String]("Utf8Encoder.in")
val out = Outlet[ByteString]("Utf8Encoder.out")
override val shape = FlowShape(in, out)
override val initialAttributes: Attributes = Attributes.name("utf8Encoder")
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with InHandler with OutHandler {
var surrogateValue: Int = 0
def inSurrogatePair: Boolean = surrogateValue != 0
def onPush(input: String, ctx: Context[ByteString]): SyncDirective = {
override def onPush(): Unit = {
val builder = new ByteStringBuilder
def b(v: Int): Unit = {
@ -51,30 +65,24 @@ private[http] class Utf8Encoder extends PushStage[String, ByteString] {
} else throw new IllegalArgumentException(f"Expected UTF-16 surrogate continuation")
var offset = 0
val input = grab(in)
while (offset < input.length) {
step(input(offset))
offset += 1
}
if (builder.length > 0) ctx.push(builder.result())
else ctx.pull()
if (builder.length > 0) push(out, builder.result())
else pull(in)
}
override def onUpstreamFinish(ctx: Context[ByteString]): TerminationDirective =
if (inSurrogatePair) ctx.fail(new IllegalArgumentException("Truncated String input (ends in the middle of surrogate pair)"))
else super.onUpstreamFinish(ctx)
}
/**
* INTERNAL API
*/
private[http] object Utf8Encoder {
val SurrogateFirst = 0xd800
val SurrogateSecond = 0xdc00
val Utf8OneByteLimit = lowerNBitsSet(7)
val Utf8TwoByteLimit = lowerNBitsSet(11)
val Utf8ThreeByteLimit = lowerNBitsSet(16)
def lowerNBitsSet(n: Int): Long = (1L << n) - 1
override def onUpstreamFinish(): Unit =
if (inSurrogatePair) failStage(new IllegalArgumentException("Truncated String input (ends in the middle of surrogate pair)"))
else completeStage()
override def onPull(): Unit = pull(in)
setHandlers(in, out, this)
}
override def toString: String = "Utf8Encoder"
}

View file

@ -42,7 +42,7 @@ class Utf8CodingSpecs extends FreeSpec with Matchers with PropertyChecks with Wi
def encodeUtf8(str: String): ByteString =
Source(str.map(ch new String(Array(ch)))) // chunk in smallest chunks possible
.transform(() new Utf8Encoder)
.via(Utf8Encoder)
.runFold(ByteString.empty)(_ ++ _).awaitResult(1.second)
def decodeUtf8(bytes: ByteString): String = {

View file

@ -3,6 +3,7 @@
*/
package akka
import akka.MiMa.FilterAnyProblemStartingWith
import sbt._
import sbt.Keys._
import com.typesafe.tools.mima.plugin.MimaPlugin
@ -645,37 +646,19 @@ object MiMa extends AutoPlugin {
ProblemFilters.exclude[MissingMethodProblem]("akka.pattern.FutureTimeoutSupport.afterCompletionStage")
),
"2.4.2" -> Seq(
FilterAnyProblemStartingWith("akka.stream.impl.VirtualProcessor"),
ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.stream.impl.fusing.GraphInterpreter.execute"),
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.fusing.GraphInterpreter.this"),
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.fusing.GraphInterpreterShell.init"),
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.fusing.GraphInterpreterShell.receive"),
ProblemFilters.exclude[MissingClassProblem]("akka.stream.impl.Stages$Drop"),
ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.stream.impl.MaterializerSession.assignPort"),
ProblemFilters.exclude[MissingClassProblem]("akka.stream.impl.Stages$Drop$"),
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.FanIn#InputBunch.dequeuePrefering"),
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.fusing.Fusing#BuildStructuralInfo.registerInteral"),
ProblemFilters.exclude[MissingTypesProblem]("akka.stream.impl.fusing.Drop"),
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.fusing.Drop.onPush"),
//internal API
FilterAnyProblemStartingWith("akka.http.impl"),
FilterAnyProblemStartingWith("akka.stream.impl"),
ProblemFilters.exclude[FinalClassProblem]("akka.stream.stage.GraphStageLogic$Reading"), // this class is private
// lifting this method to the type where it belongs
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.scaladsl.FlowOpsMat.mapMaterializedValue"),
// #19908 Take is private
ProblemFilters.exclude[MissingClassProblem]("akka.stream.impl.Stages$Take$"),
ProblemFilters.exclude[MissingClassProblem]("akka.stream.impl.Stages$Take"),
ProblemFilters.exclude[MissingTypesProblem]("akka.stream.impl.fusing.Take"),
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.fusing.Take.onPush"),
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.fusing.Take.onPull"),
// #19815 make HTTP compile under Scala 2.12.0-M3
ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.http.scaladsl.model.headers.CacheDirectives#private.apply"),
ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.http.scaladsl.model.headers.CacheDirectives#no-cache.apply"),
// #19913 internal and shouldn't be public
FilterAnyProblemStartingWith("akka.http.impl"),
// #19983 withoutSizeLimit overrides for Scala API
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.http.scaladsl.model.RequestEntity.withoutSizeLimit"),
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.http.scaladsl.model.UniversalEntity.withoutSizeLimit"),
@ -722,30 +705,10 @@ object MiMa extends AutoPlugin {
// #19849 content negotiation fixes
ProblemFilters.exclude[FinalClassProblem]("akka.http.scaladsl.marshalling.Marshal$UnacceptableResponseContentTypeException"),
// #20009 internal and shouldn't have been public
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.QueueSource.completion"),
// #20015 simplify materialized value computation tree
ProblemFilters.exclude[FinalMethodProblem]("akka.stream.impl.StreamLayout#AtomicModule.subModules"),
ProblemFilters.exclude[FinalMethodProblem]("akka.stream.impl.StreamLayout#AtomicModule.downstreams"),
ProblemFilters.exclude[FinalMethodProblem]("akka.stream.impl.StreamLayout#AtomicModule.upstreams"),
ProblemFilters.exclude[FinalMethodProblem]("akka.stream.impl.Stages#DirectProcessor.toString"),
ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.stream.impl.MaterializerSession.materializeAtomic"),
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.impl.MaterializerSession.materializeAtomic"),
ProblemFilters.exclude[MissingTypesProblem]("akka.stream.impl.Stages$StageModule"),
ProblemFilters.exclude[FinalMethodProblem]("akka.stream.impl.Stages#GroupBy.toString"),
ProblemFilters.exclude[MissingTypesProblem]("akka.stream.impl.FlowModule"),
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.FlowModule.subModules"),
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.impl.FlowModule.label"),
ProblemFilters.exclude[FinalClassProblem]("akka.stream.impl.fusing.GraphModule"),
// #15947 catch mailbox creation failures
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.actor.RepointableActorRef.point"),
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.actor.dungeon.Dispatch.initWithFailure"),
// #19877 Source.queue termination support
ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.stream.impl.SourceQueueAdapter.this"),
// #19828
ProblemFilters.exclude[DirectAbstractMethodProblem]("akka.persistence.Eventsourced#ProcessingState.onWriteMessageComplete"),
ProblemFilters.exclude[ReversedAbstractMethodProblem]("akka.persistence.Eventsourced#ProcessingState.onWriteMessageComplete"),