diff --git a/akka-http-core/src/main/scala/akka/http/impl/engine/ws/MessageToFrameRenderer.scala b/akka-http-core/src/main/scala/akka/http/impl/engine/ws/MessageToFrameRenderer.scala index 2ca85961d8..3f35812b07 100644 --- a/akka-http-core/src/main/scala/akka/http/impl/engine/ws/MessageToFrameRenderer.scala +++ b/akka-http-core/src/main/scala/akka/http/impl/engine/ws/MessageToFrameRenderer.scala @@ -32,7 +32,7 @@ private[http] object MessageToFrameRenderer { case BinaryMessage.Strict(data) ⇒ strictFrames(Opcode.Binary, data) case bm: BinaryMessage ⇒ streamedFrames(Opcode.Binary, bm.dataStream) case TextMessage.Strict(text) ⇒ strictFrames(Opcode.Text, ByteString(text, "UTF-8")) - case tm: TextMessage ⇒ streamedFrames(Opcode.Text, tm.textStream.transform(() ⇒ new Utf8Encoder)) + case tm: TextMessage ⇒ streamedFrames(Opcode.Text, tm.textStream.via(Utf8Encoder)) } } } diff --git a/akka-http-core/src/main/scala/akka/http/impl/engine/ws/Utf8Encoder.scala b/akka-http-core/src/main/scala/akka/http/impl/engine/ws/Utf8Encoder.scala index db8e8fab22..d7b7ec3270 100644 --- a/akka-http-core/src/main/scala/akka/http/impl/engine/ws/Utf8Encoder.scala +++ b/akka-http-core/src/main/scala/akka/http/impl/engine/ws/Utf8Encoder.scala @@ -4,6 +4,7 @@ package akka.http.impl.engine.ws +import akka.stream.{ Outlet, Inlet, FlowShape, Attributes } import akka.stream.stage._ import akka.util.{ ByteStringBuilder, ByteString } @@ -12,63 +13,7 @@ import akka.util.{ ByteStringBuilder, ByteString } * * INTERNAL API */ -private[http] class Utf8Encoder extends PushStage[String, ByteString] { - import Utf8Encoder._ - - var surrogateValue: Int = 0 - def inSurrogatePair: Boolean = surrogateValue != 0 - - def onPush(input: String, ctx: Context[ByteString]): SyncDirective = { - val builder = new ByteStringBuilder - - def b(v: Int): Unit = { - builder += v.toByte - } - - def step(char: Int): Unit = - if (!inSurrogatePair) - if (char <= Utf8OneByteLimit) builder += char.toByte - else if (char <= Utf8TwoByteLimit) { - b(0xc0 | ((char & 0x7c0) >> 6)) // upper 5 bits - b(0x80 | (char & 0x3f)) // lower 6 bits - } else if (char >= SurrogateFirst && char < SurrogateSecond) - surrogateValue = 0x10000 | ((char ^ SurrogateFirst) << 10) - else if (char >= SurrogateSecond && char < 0xdfff) - throw new IllegalArgumentException(f"Unexpected UTF-16 surrogate continuation") - else if (char <= Utf8ThreeByteLimit) { - b(0xe0 | ((char & 0xf000) >> 12)) // upper 4 bits - b(0x80 | ((char & 0x0fc0) >> 6)) // middle 6 bits - b(0x80 | (char & 0x3f)) // lower 6 bits - } else - throw new IllegalStateException("Char cannot be >= 2^16") // char value was converted from 16bit value - else if (char >= SurrogateSecond && char <= 0xdfff) { - surrogateValue |= (char & 0x3ff) - b(0xf0 | ((surrogateValue & 0x1c0000) >> 18)) // upper 3 bits - b(0x80 | ((surrogateValue & 0x3f000) >> 12)) // first middle 6 bits - b(0x80 | ((surrogateValue & 0x0fc0) >> 6)) // second middle 6 bits - b(0x80 | (surrogateValue & 0x3f)) // lower 6 bits - surrogateValue = 0 - } else throw new IllegalArgumentException(f"Expected UTF-16 surrogate continuation") - - var offset = 0 - while (offset < input.length) { - step(input(offset)) - offset += 1 - } - - if (builder.length > 0) ctx.push(builder.result()) - else ctx.pull() - } - - override def onUpstreamFinish(ctx: Context[ByteString]): TerminationDirective = - if (inSurrogatePair) ctx.fail(new IllegalArgumentException("Truncated String input (ends in the middle of surrogate pair)")) - else super.onUpstreamFinish(ctx) -} - -/** - * INTERNAL API - */ -private[http] object Utf8Encoder { +private[http] object Utf8Encoder extends GraphStage[FlowShape[String, ByteString]] { val SurrogateFirst = 0xd800 val SurrogateSecond = 0xdc00 @@ -77,4 +22,67 @@ private[http] object Utf8Encoder { val Utf8ThreeByteLimit = lowerNBitsSet(16) def lowerNBitsSet(n: Int): Long = (1L << n) - 1 + + val in = Inlet[String]("Utf8Encoder.in") + val out = Outlet[ByteString]("Utf8Encoder.out") + override val shape = FlowShape(in, out) + override val initialAttributes: Attributes = Attributes.name("utf8Encoder") + + override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with InHandler with OutHandler { + var surrogateValue: Int = 0 + def inSurrogatePair: Boolean = surrogateValue != 0 + + override def onPush(): Unit = { + val builder = new ByteStringBuilder + + def b(v: Int): Unit = { + builder += v.toByte + } + + def step(char: Int): Unit = + if (!inSurrogatePair) + if (char <= Utf8OneByteLimit) builder += char.toByte + else if (char <= Utf8TwoByteLimit) { + b(0xc0 | ((char & 0x7c0) >> 6)) // upper 5 bits + b(0x80 | (char & 0x3f)) // lower 6 bits + } else if (char >= SurrogateFirst && char < SurrogateSecond) + surrogateValue = 0x10000 | ((char ^ SurrogateFirst) << 10) + else if (char >= SurrogateSecond && char < 0xdfff) + throw new IllegalArgumentException(f"Unexpected UTF-16 surrogate continuation") + else if (char <= Utf8ThreeByteLimit) { + b(0xe0 | ((char & 0xf000) >> 12)) // upper 4 bits + b(0x80 | ((char & 0x0fc0) >> 6)) // middle 6 bits + b(0x80 | (char & 0x3f)) // lower 6 bits + } else + throw new IllegalStateException("Char cannot be >= 2^16") // char value was converted from 16bit value + else if (char >= SurrogateSecond && char <= 0xdfff) { + surrogateValue |= (char & 0x3ff) + b(0xf0 | ((surrogateValue & 0x1c0000) >> 18)) // upper 3 bits + b(0x80 | ((surrogateValue & 0x3f000) >> 12)) // first middle 6 bits + b(0x80 | ((surrogateValue & 0x0fc0) >> 6)) // second middle 6 bits + b(0x80 | (surrogateValue & 0x3f)) // lower 6 bits + surrogateValue = 0 + } else throw new IllegalArgumentException(f"Expected UTF-16 surrogate continuation") + + var offset = 0 + val input = grab(in) + while (offset < input.length) { + step(input(offset)) + offset += 1 + } + + if (builder.length > 0) push(out, builder.result()) + else pull(in) + } + + override def onUpstreamFinish(): Unit = + if (inSurrogatePair) failStage(new IllegalArgumentException("Truncated String input (ends in the middle of surrogate pair)")) + else completeStage() + + override def onPull(): Unit = pull(in) + + setHandlers(in, out, this) + } + + override def toString: String = "Utf8Encoder" } \ No newline at end of file diff --git a/akka-http-core/src/test/scala/akka/http/impl/engine/ws/Utf8CodingSpecs.scala b/akka-http-core/src/test/scala/akka/http/impl/engine/ws/Utf8CodingSpecs.scala index 0a4fe0cd04..c15fe00969 100644 --- a/akka-http-core/src/test/scala/akka/http/impl/engine/ws/Utf8CodingSpecs.scala +++ b/akka-http-core/src/test/scala/akka/http/impl/engine/ws/Utf8CodingSpecs.scala @@ -42,7 +42,7 @@ class Utf8CodingSpecs extends FreeSpec with Matchers with PropertyChecks with Wi def encodeUtf8(str: String): ByteString = Source(str.map(ch ⇒ new String(Array(ch)))) // chunk in smallest chunks possible - .transform(() ⇒ new Utf8Encoder) + .via(Utf8Encoder) .runFold(ByteString.empty)(_ ++ _).awaitResult(1.second) def decodeUtf8(bytes: ByteString): String = { diff --git a/project/MiMa.scala b/project/MiMa.scala index f4ad109b29..a67a9cbae3 100644 --- a/project/MiMa.scala +++ b/project/MiMa.scala @@ -3,6 +3,7 @@ */ package akka +import akka.MiMa.FilterAnyProblemStartingWith import sbt._ import sbt.Keys._ import com.typesafe.tools.mima.plugin.MimaPlugin @@ -645,37 +646,19 @@ object MiMa extends AutoPlugin { ProblemFilters.exclude[MissingMethodProblem]("akka.pattern.FutureTimeoutSupport.afterCompletionStage") ), "2.4.2" -> Seq( - FilterAnyProblemStartingWith("akka.stream.impl.VirtualProcessor"), - ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.stream.impl.fusing.GraphInterpreter.execute"), - ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.fusing.GraphInterpreter.this"), - ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.fusing.GraphInterpreterShell.init"), - ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.fusing.GraphInterpreterShell.receive"), - ProblemFilters.exclude[MissingClassProblem]("akka.stream.impl.Stages$Drop"), - ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.stream.impl.MaterializerSession.assignPort"), - ProblemFilters.exclude[MissingClassProblem]("akka.stream.impl.Stages$Drop$"), - ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.FanIn#InputBunch.dequeuePrefering"), - ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.fusing.Fusing#BuildStructuralInfo.registerInteral"), - ProblemFilters.exclude[MissingTypesProblem]("akka.stream.impl.fusing.Drop"), - ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.fusing.Drop.onPush"), + //internal API + FilterAnyProblemStartingWith("akka.http.impl"), + FilterAnyProblemStartingWith("akka.stream.impl"), + ProblemFilters.exclude[FinalClassProblem]("akka.stream.stage.GraphStageLogic$Reading"), // this class is private // lifting this method to the type where it belongs ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.scaladsl.FlowOpsMat.mapMaterializedValue"), - // #19908 Take is private - ProblemFilters.exclude[MissingClassProblem]("akka.stream.impl.Stages$Take$"), - ProblemFilters.exclude[MissingClassProblem]("akka.stream.impl.Stages$Take"), - ProblemFilters.exclude[MissingTypesProblem]("akka.stream.impl.fusing.Take"), - ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.fusing.Take.onPush"), - ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.fusing.Take.onPull"), - // #19815 make HTTP compile under Scala 2.12.0-M3 ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.http.scaladsl.model.headers.CacheDirectives#private.apply"), ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.http.scaladsl.model.headers.CacheDirectives#no-cache.apply"), - // #19913 internal and shouldn't be public - FilterAnyProblemStartingWith("akka.http.impl"), - // #19983 withoutSizeLimit overrides for Scala API ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.http.scaladsl.model.RequestEntity.withoutSizeLimit"), ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.http.scaladsl.model.UniversalEntity.withoutSizeLimit"), @@ -722,30 +705,10 @@ object MiMa extends AutoPlugin { // #19849 content negotiation fixes ProblemFilters.exclude[FinalClassProblem]("akka.http.scaladsl.marshalling.Marshal$UnacceptableResponseContentTypeException"), - // #20009 internal and shouldn't have been public - ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.QueueSource.completion"), - - // #20015 simplify materialized value computation tree - ProblemFilters.exclude[FinalMethodProblem]("akka.stream.impl.StreamLayout#AtomicModule.subModules"), - ProblemFilters.exclude[FinalMethodProblem]("akka.stream.impl.StreamLayout#AtomicModule.downstreams"), - ProblemFilters.exclude[FinalMethodProblem]("akka.stream.impl.StreamLayout#AtomicModule.upstreams"), - ProblemFilters.exclude[FinalMethodProblem]("akka.stream.impl.Stages#DirectProcessor.toString"), - ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.stream.impl.MaterializerSession.materializeAtomic"), - ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.impl.MaterializerSession.materializeAtomic"), - ProblemFilters.exclude[MissingTypesProblem]("akka.stream.impl.Stages$StageModule"), - ProblemFilters.exclude[FinalMethodProblem]("akka.stream.impl.Stages#GroupBy.toString"), - ProblemFilters.exclude[MissingTypesProblem]("akka.stream.impl.FlowModule"), - ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.FlowModule.subModules"), - ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.impl.FlowModule.label"), - ProblemFilters.exclude[FinalClassProblem]("akka.stream.impl.fusing.GraphModule"), - // #15947 catch mailbox creation failures ProblemFilters.exclude[DirectMissingMethodProblem]("akka.actor.RepointableActorRef.point"), ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.actor.dungeon.Dispatch.initWithFailure"), - // #19877 Source.queue termination support - ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.stream.impl.SourceQueueAdapter.this"), - // #19828 ProblemFilters.exclude[DirectAbstractMethodProblem]("akka.persistence.Eventsourced#ProcessingState.onWriteMessageComplete"), ProblemFilters.exclude[ReversedAbstractMethodProblem]("akka.persistence.Eventsourced#ProcessingState.onWriteMessageComplete"),