Use the target actor name in the temporary ask actor name (#29245)

* Allows specifying the large message channel for the response using wildcard patterns in config
 * Makes debugging asks somewhat easier
This commit is contained in:
Johan Andrén 2020-07-08 17:43:52 +02:00 committed by GitHub
parent 2f424b13dd
commit df995fe7bd
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
16 changed files with 101 additions and 35 deletions

View file

@ -7,16 +7,19 @@ package akka.pattern
import scala.concurrent.Await
import scala.concurrent.duration._
import scala.util.Failure
import com.github.ghik.silencer.silent
import language.postfixOps
import language.postfixOps
import akka.actor._
import akka.testkit.WithLogCapturing
import akka.testkit.{ AkkaSpec, TestProbe }
import akka.util.Timeout
@silent
class AskSpec extends AkkaSpec {
class AskSpec extends AkkaSpec("""
akka.loglevel = DEBUG
akka.loggers = ["akka.testkit.SilenceAllTestEventListener"]
""") with WithLogCapturing {
"The “ask” pattern" must {
"send request to actor and wrap the answer in Future" in {
@ -232,6 +235,26 @@ class AskSpec extends AkkaSpec {
completed should ===("complete")
expectTerminated(promiseActorRef, 1.second)
}
"encode target name in temporary actor name" in {
implicit val timeout: Timeout = Timeout(300 millis)
val p = TestProbe()
val act = system.actorOf(Props(new Actor {
def receive = {
case msg => p.ref ! sender() -> msg
}
}), "myName")
(act ? "ask").mapTo[String]
val (promiseActorRef, "ask") = p.expectMsgType[(ActorRef, String)]
promiseActorRef.path.name should startWith("myName")
(system.actorSelection("/user/myName") ? "ask").mapTo[String]
val (promiseActorRefForSelection, "ask") = p.expectMsgType[(ActorRef, String)]
promiseActorRefForSelection.path.name should startWith("_user_myName")
}
}
}

View file

@ -0,0 +1,2 @@
# target actor name in the temporary ask actor name #29205, changes to ask internals
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.actor.typed.scaladsl.AskPattern.AskPath")

View file

@ -23,4 +23,6 @@ import akka.annotation.InternalApi
*/
def isTerminated: Boolean
def refPrefix: String = toString
}

View file

@ -38,6 +38,8 @@ import akka.dispatch.sysmsg
// impl InternalRecipientRef
def isTerminated: Boolean = classicRef.isTerminated
override def refPrefix: String = path.name
@throws(classOf[java.io.ObjectStreamException])
private def writeReplace(): AnyRef = SerializedActorRef[T](this)
}

View file

@ -121,6 +121,8 @@ import akka.annotation.InternalApi
ActorRefAdapter(ref)
}
override def refPrefix: String = "user"
override def address: Address = system.provider.getDefaultAddress
}

View file

@ -7,17 +7,14 @@ package akka.actor.typed.scaladsl
import java.util.concurrent.TimeoutException
import scala.concurrent.Future
import com.github.ghik.silencer.silent
import akka.actor.{ Address, RootActorPath }
import akka.actor.typed.ActorRef
import akka.actor.typed.ActorSystem
import akka.actor.typed.RecipientRef
import akka.actor.typed.Scheduler
import akka.actor.typed.internal.{ adapter => adapt }
import akka.actor.typed.internal.InternalRecipientRef
import akka.annotation.{ InternalApi, InternalStableApi }
import akka.annotation.InternalStableApi
import akka.pattern.PromiseActorRef
import akka.util.{ unused, Timeout }
@ -138,7 +135,7 @@ object AskPattern {
null)
else {
// messageClassName "unknown' is set later, after applying the message factory
val a = PromiseActorRef(target.provider, timeout, target, "unknown", onTimeout = onTimeout)
val a = PromiseActorRef(target.provider, timeout, target, "unknown", target.refPrefix, onTimeout = onTimeout)
val b = adapt.ActorRefAdapter[U](a)
(b, a.result.future.asInstanceOf[Future[U]], a)
}
@ -161,9 +158,4 @@ object AskPattern {
p.ask(target, m, timeout)
}
/**
* INTERNAL API
*/
@InternalApi
private[typed] val AskPath = RootActorPath(Address("akka.actor.typed.internal", "ask"))
}

View file

@ -0,0 +1,6 @@
# target actor name in the temporary ask actor name #29205, changes to ask internals
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.actor.ActorRefProvider.tempPath")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.pattern.PromiseActorRef.apply")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.pattern.PromiseActorRef.apply$default$5")
ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.pattern.PromiseActorRef.apply$default$6")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.pattern.PromiseActorRef.this")

View file

@ -9,7 +9,6 @@ import java.util.concurrent.atomic.AtomicLong
import scala.annotation.implicitNotFound
import scala.concurrent.{ ExecutionContextExecutor, Future, Promise }
import scala.util.control.NonFatal
import akka.ConfigurationException
import akka.annotation.DoNotInherit
import akka.annotation.InternalApi
@ -89,6 +88,11 @@ import akka.util.OptionVal
*/
def tempPath(): ActorPath
/**
* Generates and returns a unique actor path starting with `prefix` below /temp.
*/
def tempPath(prefix: String): ActorPath
/**
* Returns the actor reference representing the /temp path.
*/
@ -414,7 +418,17 @@ private[akka] class LocalActorRefProvider private[akka] (
private val tempNode = rootPath / "temp"
override def tempPath(): ActorPath = tempNode / Helpers.base64(tempNumber.getAndIncrement())
override def tempPath(): ActorPath = tempPath("")
override def tempPath(prefix: String): ActorPath = {
val builder = new java.lang.StringBuilder()
if (prefix.nonEmpty) {
builder.append(prefix)
}
builder.append("$")
Helpers.base64(tempNumber.getAndIncrement(), builder)
tempNode / builder.toString
}
/**
* Top-level anchor for the supervision hierarchy of this actor system. Will

View file

@ -4,19 +4,19 @@
package akka.pattern
import java.net.URLEncoder
import java.util.concurrent.TimeoutException
import scala.annotation.tailrec
import scala.concurrent.{ Future, Promise }
import scala.language.implicitConversions
import scala.util.{ Failure, Success }
import com.github.ghik.silencer.silent
import akka.actor._
import akka.annotation.{ InternalApi, InternalStableApi }
import akka.dispatch.ExecutionContexts
import akka.dispatch.sysmsg._
import akka.util.ByteString
import akka.util.{ Timeout, Unsafe }
import akka.util.unused
@ -340,7 +340,7 @@ final class AskableActorRef(val actorRef: ActorRef) extends AnyVal {
if (timeout.duration.length <= 0)
Future.failed[Any](AskableActorRef.negativeTimeoutException(actorRef, message, sender))
else {
PromiseActorRef(ref.provider, timeout, targetName = actorRef, message.getClass.getName, sender)
PromiseActorRef(ref.provider, timeout, targetName = actorRef, message.getClass.getName, ref.path.name, sender)
.ask(actorRef, message, timeout)
}
case _ => Future.failed[Any](AskableActorRef.unsupportedRecipientType(actorRef, message, sender))
@ -373,7 +373,7 @@ final class ExplicitlyAskableActorRef(val actorRef: ActorRef) extends AnyVal {
val message = messageFactory(ref.provider.deadLetters)
Future.failed[Any](AskableActorRef.negativeTimeoutException(actorRef, message, sender))
} else {
val a = PromiseActorRef(ref.provider, timeout, targetName = actorRef, "unknown", sender)
val a = PromiseActorRef(ref.provider, timeout, targetName = actorRef, "unknown", ref.path.name, sender)
val message = messageFactory(a)
a.messageClassName = message.getClass.getName
a.ask(actorRef, message, timeout)
@ -422,7 +422,8 @@ final class AskableActorSelection(val actorSel: ActorSelection) extends AnyVal {
if (timeout.duration.length <= 0)
Future.failed[Any](AskableActorRef.negativeTimeoutException(actorSel, message, sender))
else {
PromiseActorRef(ref.provider, timeout, targetName = actorSel, message.getClass.getName, sender)
val refPrefix = URLEncoder.encode(actorSel.pathString.replace("/", "_"), ByteString.UTF_8)
PromiseActorRef(ref.provider, timeout, targetName = actorSel, message.getClass.getName, refPrefix, sender)
.ask(actorSel, message, timeout)
}
case _ => Future.failed[Any](AskableActorRef.unsupportedRecipientType(actorSel, message, sender))
@ -450,7 +451,8 @@ final class ExplicitlyAskableActorSelection(val actorSel: ActorSelection) extend
val message = messageFactory(ref.provider.deadLetters)
Future.failed[Any](AskableActorRef.negativeTimeoutException(actorSel, message, sender))
} else {
val a = PromiseActorRef(ref.provider, timeout, targetName = actorSel, "unknown", sender)
val refPrefix = URLEncoder.encode(actorSel.pathString.replace("/", "_"), ByteString.UTF_8)
val a = PromiseActorRef(ref.provider, timeout, targetName = actorSel, "unknown", refPrefix, sender)
val message = messageFactory(a)
a.messageClassName = message.getClass.getName
a.ask(actorSel, message, timeout)
@ -476,7 +478,8 @@ final class ExplicitlyAskableActorSelection(val actorSel: ActorSelection) extend
private[akka] final class PromiseActorRef private (
val provider: ActorRefProvider,
val result: Promise[Any],
_mcn: String)
_mcn: String,
refPathPrefix: String)
extends MinimalActorRef {
import AbstractPromiseActorRef.{ stateOffset, watchedByOffset }
import PromiseActorRef._
@ -553,7 +556,7 @@ private[akka] final class PromiseActorRef private (
if (updateState(null, Registering)) {
var p: ActorPath = null
try {
p = provider.tempPath()
p = provider.tempPath(refPathPrefix)
provider.registerTempActor(this, p)
p
} finally {
@ -670,11 +673,14 @@ private[akka] object PromiseActorRef {
timeout: Timeout,
targetName: Any,
messageClassName: String,
refPathPrefix: String,
sender: ActorRef = Actor.noSender,
onTimeout: String => Throwable = defaultOnTimeout): PromiseActorRef = {
if (refPathPrefix.indexOf('/') > -1)
throw new IllegalArgumentException(s"refPathPrefix must not contain slash, was: $refPathPrefix")
val result = Promise[Any]()
val scheduler = provider.guardian.underlying.system.scheduler
val a = new PromiseActorRef(provider, result, messageClassName)
val a = new PromiseActorRef(provider, result, messageClassName, refPathPrefix)
implicit val ec = ExecutionContexts.parasitic
val f = scheduler.scheduleOnce(timeout.duration) {
val timedOut = result.tryComplete {

View file

@ -48,7 +48,8 @@ trait GracefulStopSupport {
*/
def gracefulStop(target: ActorRef, timeout: FiniteDuration, stopMessage: Any = PoisonPill): Future[Boolean] = {
val internalTarget = target.asInstanceOf[InternalActorRef]
val ref = PromiseActorRef(internalTarget.provider, Timeout(timeout), target, stopMessage.getClass.getName)
val ref =
PromiseActorRef(internalTarget.provider, Timeout(timeout), target, stopMessage.getClass.getName, target.path.name)
internalTarget.sendSystemMessage(Watch(internalTarget, ref))
target.tell(stopMessage, Actor.noSender)
ref.result.future.transform({

View file

@ -154,7 +154,8 @@ private[akka] final class AskPromiseRef private (promiseActorRef: PromiseActorRe
private[akka] object AskPromiseRef {
def apply(provider: ActorRefProvider, timeout: Timeout): AskPromiseRef = {
if (timeout.duration.length > 0) {
val promiseActorRef = PromiseActorRef(provider, timeout, "unknown", "unknown", provider.deadLetters)
val promiseActorRef =
PromiseActorRef(provider, timeout, "unknown", "unknown", "deadLetters", provider.deadLetters)
new AskPromiseRef(promiseActorRef)
} else {
throw new IllegalArgumentException(s"Timeout length must not be negative, was: $timeout")

View file

@ -0,0 +1,2 @@
# target actor name in the temporary ask actor name #29205, changes to ask internals
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.sharding.typed.internal.EntityRefImpl#EntityPromiseRef.this")

View file

@ -12,7 +12,6 @@ import java.util.concurrent.ConcurrentHashMap
import scala.compat.java8.FutureConverters._
import scala.concurrent.Future
import akka.actor.ActorRefProvider
import akka.actor.ExtendedActorSystem
import akka.actor.InternalActorRef
@ -303,11 +302,13 @@ import akka.util.JavaDurationConverters._
with scaladsl.EntityRef[M]
with InternalRecipientRef[M] {
override val refPrefix = URLEncoder.encode(s"${typeKey.name}-$entityId", ByteString.UTF_8)
override def tell(msg: M): Unit =
shardRegion ! ShardingEnvelope(entityId, msg)
override def ask[U](message: ActorRef[U] => M)(implicit timeout: Timeout): Future[U] = {
val replyTo = new EntityPromiseRef[U](shardRegion.asInstanceOf[InternalActorRef], timeout)
val replyTo = new EntityPromiseRef[U](shardRegion.asInstanceOf[InternalActorRef], timeout, refPrefix)
val m = message(replyTo.ref)
if (replyTo.promiseRef ne null) replyTo.promiseRef.messageClassName = m.getClass.getName
replyTo.ask(shardRegion, entityId, m, timeout)
@ -318,7 +319,7 @@ import akka.util.JavaDurationConverters._
/** Similar to [[akka.actor.typed.scaladsl.AskPattern.PromiseRef]] but for an `EntityRef` target. */
@InternalApi
private final class EntityPromiseRef[U](classic: InternalActorRef, timeout: Timeout) {
private final class EntityPromiseRef[U](classic: InternalActorRef, timeout: Timeout, refPathPrefix: String) {
import akka.actor.typed.internal.{ adapter => adapt }
// Note: _promiseRef mustn't have a type pattern, since it can be null
@ -339,7 +340,12 @@ import akka.util.JavaDurationConverters._
else {
// note that the real messageClassName will be set afterwards, replyTo pattern
val a =
PromiseActorRef(classic.provider, timeout, targetName = EntityRefImpl.this, messageClassName = "unknown")
PromiseActorRef(
classic.provider,
timeout,
targetName = EntityRefImpl.this,
messageClassName = "unknown",
refPathPrefix)
val b = adapt.ActorRefAdapter[U](a)
(b, a.result.future.asInstanceOf[Future[U]], a)
}
@ -372,7 +378,6 @@ import akka.util.JavaDurationConverters._
}
override def toString: String = s"EntityRef($typeKey, $entityId)"
}
/**

View file

@ -78,7 +78,7 @@ object ClusterShardingSpec {
case (ctx, WhoAreYou(replyTo)) =>
val address = Cluster(ctx.system).selfMember.address
replyTo ! s"I'm ${ctx.self.path.name} at ${address.host.get}:${address.port.get}"
replyTo ! s"I'm ${ctx.self.path.name} at ${address.host.get}:${address.port.get} responding to $replyTo"
Behaviors.same
case (_, ReplyPlz(toMe)) =>
@ -270,7 +270,10 @@ class ClusterShardingSpec
val charlieRef = sharding.entityRefFor(typeKeyWithEnvelopes, "charlie")
val reply1 = bobRef ? WhoAreYou // TODO document that WhoAreYou(_) would not work
reply1.futureValue should startWith("I'm bob")
val response = reply1.futureValue
response should startWith("I'm bob")
// typekey and entity id encoded in promise ref path
response should include(s"${typeKeyWithEnvelopes.name}-bob")
val reply2 = charlieRef.ask(WhoAreYou)
reply2.futureValue should startWith("I'm charlie")
@ -295,7 +298,10 @@ class ClusterShardingSpec
}
})
p.receiveMessage().s should startWith("I'm alice")
val response = p.receiveMessage()
response.s should startWith("I'm alice")
// typekey and entity id encoded in promise ref path
response.s should include(s"${typeKeyWithEnvelopes.name}-alice")
aliceRef ! StopPlz()

View file

@ -199,6 +199,7 @@ private[akka] class RemoteActorRefProvider(
local.registerTempActor(actorRef, path)
override def unregisterTempActor(path: ActorPath): Unit = local.unregisterTempActor(path)
override def tempPath(): ActorPath = local.tempPath()
override def tempPath(prefix: String): ActorPath = local.tempPath(prefix)
override def tempContainer: VirtualPathContainer = local.tempContainer
@volatile private var _internals: Internals = _

View file

@ -358,7 +358,8 @@ private[transport] class ThrottlerManager(wrappedTransport: Transport)
if (target.isTerminated) Future.successful(SetThrottleAck)
else {
val internalTarget = target.asInstanceOf[InternalActorRef]
val ref = PromiseActorRef(internalTarget.provider, timeout, target, mode.getClass.getName)
val ref =
PromiseActorRef(internalTarget.provider, timeout, target, mode.getClass.getName, internalTarget.path.name)
internalTarget.sendSystemMessage(Watch(internalTarget, ref))
target.tell(mode, ref)
ref.result.future.transform({