stream: Unencoded attribute name (#28955)
This commit is contained in:
parent
eb1cb6ebd9
commit
4f2e82e5f6
3 changed files with 23 additions and 13 deletions
|
|
@ -12,6 +12,8 @@ import scala.annotation.tailrec
|
||||||
import scala.compat.java8.OptionConverters._
|
import scala.compat.java8.OptionConverters._
|
||||||
import scala.concurrent.duration.FiniteDuration
|
import scala.concurrent.duration.FiniteDuration
|
||||||
import scala.reflect.{ classTag, ClassTag }
|
import scala.reflect.{ classTag, ClassTag }
|
||||||
|
import akka.japi.function
|
||||||
|
import java.time.Duration
|
||||||
|
|
||||||
import akka.annotation.ApiMayChange
|
import akka.annotation.ApiMayChange
|
||||||
import akka.annotation.DoNotInherit
|
import akka.annotation.DoNotInherit
|
||||||
|
|
@ -19,8 +21,13 @@ import akka.annotation.InternalApi
|
||||||
import akka.event.Logging
|
import akka.event.Logging
|
||||||
import akka.japi.function
|
import akka.japi.function
|
||||||
import akka.stream.impl.TraversalBuilder
|
import akka.stream.impl.TraversalBuilder
|
||||||
import akka.util.{ ByteString, OptionVal }
|
|
||||||
import akka.util.JavaDurationConverters._
|
import akka.util.JavaDurationConverters._
|
||||||
|
import akka.util.JavaDurationConverters._
|
||||||
|
|
||||||
|
import scala.compat.java8.OptionConverters._
|
||||||
|
import akka.util.{ ByteString, OptionVal }
|
||||||
|
|
||||||
|
import scala.concurrent.duration.FiniteDuration
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Holds attributes which can be used to alter [[akka.stream.scaladsl.Flow]] / [[akka.stream.javadsl.Flow]]
|
* Holds attributes which can be used to alter [[akka.stream.scaladsl.Flow]] / [[akka.stream.javadsl.Flow]]
|
||||||
|
|
@ -175,15 +182,23 @@ final case class Attributes(attributeList: List[Attributes.Attribute] = Nil) {
|
||||||
/**
|
/**
|
||||||
* INTERNAL API
|
* INTERNAL API
|
||||||
*/
|
*/
|
||||||
@InternalApi def nameOrDefault(default: String = "unnamed"): String = {
|
@InternalApi private def getName(): Option[String] = {
|
||||||
@tailrec def find(attrs: List[Attribute]): String = attrs match {
|
@tailrec def find(attrs: List[Attribute]): Option[String] = attrs match {
|
||||||
case Attributes.Name(name) :: _ => name
|
case Attributes.Name(name) :: _ => Some(name)
|
||||||
case _ :: tail => find(tail)
|
case _ :: tail => find(tail)
|
||||||
case Nil => default
|
case Nil => None
|
||||||
}
|
}
|
||||||
find(attributeList)
|
find(attributeList)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@InternalApi def nameOrDefault(default: String = "unnamed"): String = {
|
||||||
|
getName().getOrElse(default)
|
||||||
|
}
|
||||||
|
|
||||||
|
@InternalApi private[akka] def nameForActorRef(default: String = "unnamed"): String = {
|
||||||
|
getName().map(name => URLEncoder.encode(name, ByteString.UTF_8)).getOrElse(default)
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Test whether the given attribute is contained within this attributes list.
|
* Test whether the given attribute is contained within this attributes list.
|
||||||
*
|
*
|
||||||
|
|
@ -557,14 +572,10 @@ object Attributes {
|
||||||
/**
|
/**
|
||||||
* Specifies the name of the operation.
|
* Specifies the name of the operation.
|
||||||
* If the name is null or empty the name is ignored, i.e. [[#none]] is returned.
|
* If the name is null or empty the name is ignored, i.e. [[#none]] is returned.
|
||||||
*
|
|
||||||
* When using this method the name is encoded with URLEncoder with UTF-8 because
|
|
||||||
* the name is sometimes used as part of actor name. If that is not desired
|
|
||||||
* the name can be added in it's raw format using `.addAttributes(Attributes(Name(name)))`.
|
|
||||||
*/
|
*/
|
||||||
def name(name: String): Attributes =
|
def name(name: String): Attributes =
|
||||||
if (name == null || name.isEmpty) none
|
if (name == null || name.isEmpty) none
|
||||||
else Attributes(Name(URLEncoder.encode(name, ByteString.UTF_8)))
|
else Attributes(Name(name))
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Each asynchronous piece of a materialized stream topology is executed by one Actor
|
* Each asynchronous piece of a materialized stream topology is executed by one Actor
|
||||||
|
|
|
||||||
|
|
@ -49,8 +49,7 @@ private object ActorRefSource {
|
||||||
}
|
}
|
||||||
private var isCompleting: Boolean = false
|
private var isCompleting: Boolean = false
|
||||||
|
|
||||||
override protected def stageActorName: String =
|
override protected def stageActorName: String = inheritedAttributes.nameForActorRef(super.stageActorName)
|
||||||
inheritedAttributes.get[Attributes.Name].map(_.n).getOrElse(super.stageActorName)
|
|
||||||
|
|
||||||
private val name = inheritedAttributes.nameOrDefault(getClass.toString)
|
private val name = inheritedAttributes.nameOrDefault(getClass.toString)
|
||||||
override val ref: ActorRef = getEagerStageActor(eagerMaterializer, poisonPillCompatibility = true) {
|
override val ref: ActorRef = getEagerStageActor(eagerMaterializer, poisonPillCompatibility = true) {
|
||||||
|
|
|
||||||
|
|
@ -706,7 +706,7 @@ private final case class SavedIslandData(
|
||||||
logic.stageId = logics.size() - 1
|
logic.stageId = logics.size() - 1
|
||||||
fullIslandName match {
|
fullIslandName match {
|
||||||
case OptionVal.Some(_) => // already set
|
case OptionVal.Some(_) => // already set
|
||||||
case OptionVal.None => fullIslandName = OptionVal.Some(islandName + "-" + logic.attributes.nameOrDefault())
|
case OptionVal.None => fullIslandName = OptionVal.Some(islandName + "-" + logic.attributes.nameForActorRef())
|
||||||
}
|
}
|
||||||
matAndLogic
|
matAndLogic
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue