From 4f2e82e5f6bf1e25b382b3442bbeb6d2d1d4c28f Mon Sep 17 00:00:00 2001 From: contrun Date: Fri, 12 Jun 2020 19:57:35 +0800 Subject: [PATCH] stream: Unencoded attribute name (#28955) --- .../main/scala/akka/stream/Attributes.scala | 31 +++++++++++++------ .../akka/stream/impl/ActorRefSource.scala | 3 +- .../impl/PhasedFusingActorMaterializer.scala | 2 +- 3 files changed, 23 insertions(+), 13 deletions(-) diff --git a/akka-stream/src/main/scala/akka/stream/Attributes.scala b/akka-stream/src/main/scala/akka/stream/Attributes.scala index 72f09ea988..76af695648 100644 --- a/akka-stream/src/main/scala/akka/stream/Attributes.scala +++ b/akka-stream/src/main/scala/akka/stream/Attributes.scala @@ -12,6 +12,8 @@ import scala.annotation.tailrec import scala.compat.java8.OptionConverters._ import scala.concurrent.duration.FiniteDuration import scala.reflect.{ classTag, ClassTag } +import akka.japi.function +import java.time.Duration import akka.annotation.ApiMayChange import akka.annotation.DoNotInherit @@ -19,8 +21,13 @@ import akka.annotation.InternalApi import akka.event.Logging import akka.japi.function import akka.stream.impl.TraversalBuilder -import akka.util.{ ByteString, OptionVal } import akka.util.JavaDurationConverters._ +import akka.util.JavaDurationConverters._ + +import scala.compat.java8.OptionConverters._ +import akka.util.{ ByteString, OptionVal } + +import scala.concurrent.duration.FiniteDuration /** * Holds attributes which can be used to alter [[akka.stream.scaladsl.Flow]] / [[akka.stream.javadsl.Flow]] @@ -175,15 +182,23 @@ final case class Attributes(attributeList: List[Attributes.Attribute] = Nil) { /** * INTERNAL API */ - @InternalApi def nameOrDefault(default: String = "unnamed"): String = { - @tailrec def find(attrs: List[Attribute]): String = attrs match { - case Attributes.Name(name) :: _ => name + @InternalApi private def getName(): Option[String] = { + @tailrec def find(attrs: List[Attribute]): Option[String] = attrs match { + case Attributes.Name(name) :: _ => Some(name) case _ :: tail => find(tail) - case Nil => default + case Nil => None } find(attributeList) } + @InternalApi def nameOrDefault(default: String = "unnamed"): String = { + getName().getOrElse(default) + } + + @InternalApi private[akka] def nameForActorRef(default: String = "unnamed"): String = { + getName().map(name => URLEncoder.encode(name, ByteString.UTF_8)).getOrElse(default) + } + /** * Test whether the given attribute is contained within this attributes list. * @@ -557,14 +572,10 @@ object Attributes { /** * Specifies the name of the operation. * If the name is null or empty the name is ignored, i.e. [[#none]] is returned. - * - * When using this method the name is encoded with URLEncoder with UTF-8 because - * the name is sometimes used as part of actor name. If that is not desired - * the name can be added in it's raw format using `.addAttributes(Attributes(Name(name)))`. */ def name(name: String): Attributes = if (name == null || name.isEmpty) none - else Attributes(Name(URLEncoder.encode(name, ByteString.UTF_8))) + else Attributes(Name(name)) /** * Each asynchronous piece of a materialized stream topology is executed by one Actor diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorRefSource.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorRefSource.scala index fcd8a7daa0..4de7027d2c 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorRefSource.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorRefSource.scala @@ -49,8 +49,7 @@ private object ActorRefSource { } private var isCompleting: Boolean = false - override protected def stageActorName: String = - inheritedAttributes.get[Attributes.Name].map(_.n).getOrElse(super.stageActorName) + override protected def stageActorName: String = inheritedAttributes.nameForActorRef(super.stageActorName) private val name = inheritedAttributes.nameOrDefault(getClass.toString) override val ref: ActorRef = getEagerStageActor(eagerMaterializer, poisonPillCompatibility = true) { diff --git a/akka-stream/src/main/scala/akka/stream/impl/PhasedFusingActorMaterializer.scala b/akka-stream/src/main/scala/akka/stream/impl/PhasedFusingActorMaterializer.scala index 9fea322785..0b87a43d36 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/PhasedFusingActorMaterializer.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/PhasedFusingActorMaterializer.scala @@ -706,7 +706,7 @@ private final case class SavedIslandData( logic.stageId = logics.size() - 1 fullIslandName match { case OptionVal.Some(_) => // already set - case OptionVal.None => fullIslandName = OptionVal.Some(islandName + "-" + logic.attributes.nameOrDefault()) + case OptionVal.None => fullIslandName = OptionVal.Some(islandName + "-" + logic.attributes.nameForActorRef()) } matAndLogic }