diff --git a/akka-actor-tests/src/test/scala/akka/pattern/AskSpec.scala b/akka-actor-tests/src/test/scala/akka/pattern/AskSpec.scala index e8a0bde681..37d74cf925 100644 --- a/akka-actor-tests/src/test/scala/akka/pattern/AskSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/pattern/AskSpec.scala @@ -7,16 +7,19 @@ package akka.pattern import scala.concurrent.Await import scala.concurrent.duration._ import scala.util.Failure - import com.github.ghik.silencer.silent -import language.postfixOps +import language.postfixOps import akka.actor._ +import akka.testkit.WithLogCapturing import akka.testkit.{ AkkaSpec, TestProbe } import akka.util.Timeout @silent -class AskSpec extends AkkaSpec { +class AskSpec extends AkkaSpec(""" + akka.loglevel = DEBUG + akka.loggers = ["akka.testkit.SilenceAllTestEventListener"] + """) with WithLogCapturing { "The “ask” pattern" must { "send request to actor and wrap the answer in Future" in { @@ -232,6 +235,26 @@ class AskSpec extends AkkaSpec { completed should ===("complete") expectTerminated(promiseActorRef, 1.second) } + + "encode target name in temporary actor name" in { + implicit val timeout: Timeout = Timeout(300 millis) + val p = TestProbe() + + val act = system.actorOf(Props(new Actor { + def receive = { + case msg => p.ref ! sender() -> msg + } + }), "myName") + + (act ? "ask").mapTo[String] + val (promiseActorRef, "ask") = p.expectMsgType[(ActorRef, String)] + + promiseActorRef.path.name should startWith("myName") + + (system.actorSelection("/user/myName") ? "ask").mapTo[String] + val (promiseActorRefForSelection, "ask") = p.expectMsgType[(ActorRef, String)] + promiseActorRefForSelection.path.name should startWith("_user_myName") + } } } diff --git a/akka-actor-typed/src/main/mima-filters/2.6.6.backwards.excludes/actor-path-in-ask-temp-name.excludes b/akka-actor-typed/src/main/mima-filters/2.6.6.backwards.excludes/actor-path-in-ask-temp-name.excludes new file mode 100644 index 0000000000..afb755240f --- /dev/null +++ b/akka-actor-typed/src/main/mima-filters/2.6.6.backwards.excludes/actor-path-in-ask-temp-name.excludes @@ -0,0 +1,2 @@ +# target actor name in the temporary ask actor name #29205, changes to ask internals +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.actor.typed.scaladsl.AskPattern.AskPath") \ No newline at end of file diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/InternalRecipientRef.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/InternalRecipientRef.scala index e91843124d..6f3b8d7ba1 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/InternalRecipientRef.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/InternalRecipientRef.scala @@ -23,4 +23,6 @@ import akka.annotation.InternalApi */ def isTerminated: Boolean + def refPrefix: String = toString + } diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/adapter/ActorRefAdapter.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/adapter/ActorRefAdapter.scala index 5bd523bec7..aaad47418d 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/adapter/ActorRefAdapter.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/adapter/ActorRefAdapter.scala @@ -38,6 +38,8 @@ import akka.dispatch.sysmsg // impl InternalRecipientRef def isTerminated: Boolean = classicRef.isTerminated + override def refPrefix: String = path.name + @throws(classOf[java.io.ObjectStreamException]) private def writeReplace(): AnyRef = SerializedActorRef[T](this) } diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/adapter/ActorSystemAdapter.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/adapter/ActorSystemAdapter.scala index d5318e3c45..367c3f88ce 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/adapter/ActorSystemAdapter.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/adapter/ActorSystemAdapter.scala @@ -121,6 +121,8 @@ import akka.annotation.InternalApi ActorRefAdapter(ref) } + override def refPrefix: String = "user" + override def address: Address = system.provider.getDefaultAddress } diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/AskPattern.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/AskPattern.scala index 5894b194e1..082989dafb 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/AskPattern.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/AskPattern.scala @@ -7,17 +7,14 @@ package akka.actor.typed.scaladsl import java.util.concurrent.TimeoutException import scala.concurrent.Future - import com.github.ghik.silencer.silent - -import akka.actor.{ Address, RootActorPath } import akka.actor.typed.ActorRef import akka.actor.typed.ActorSystem import akka.actor.typed.RecipientRef import akka.actor.typed.Scheduler import akka.actor.typed.internal.{ adapter => adapt } import akka.actor.typed.internal.InternalRecipientRef -import akka.annotation.{ InternalApi, InternalStableApi } +import akka.annotation.InternalStableApi import akka.pattern.PromiseActorRef import akka.util.{ unused, Timeout } @@ -138,7 +135,7 @@ object AskPattern { null) else { // messageClassName "unknown' is set later, after applying the message factory - val a = PromiseActorRef(target.provider, timeout, target, "unknown", onTimeout = onTimeout) + val a = PromiseActorRef(target.provider, timeout, target, "unknown", target.refPrefix, onTimeout = onTimeout) val b = adapt.ActorRefAdapter[U](a) (b, a.result.future.asInstanceOf[Future[U]], a) } @@ -161,9 +158,4 @@ object AskPattern { p.ask(target, m, timeout) } - /** - * INTERNAL API - */ - @InternalApi - private[typed] val AskPath = RootActorPath(Address("akka.actor.typed.internal", "ask")) } diff --git a/akka-actor/src/main/mima-filters/2.6.6.backwards.excludes/actor-path-in-ask-temp-name.excludes b/akka-actor/src/main/mima-filters/2.6.6.backwards.excludes/actor-path-in-ask-temp-name.excludes new file mode 100644 index 0000000000..034b1f5b45 --- /dev/null +++ b/akka-actor/src/main/mima-filters/2.6.6.backwards.excludes/actor-path-in-ask-temp-name.excludes @@ -0,0 +1,6 @@ +# target actor name in the temporary ask actor name #29205, changes to ask internals +ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.actor.ActorRefProvider.tempPath") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.pattern.PromiseActorRef.apply") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.pattern.PromiseActorRef.apply$default$5") +ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.pattern.PromiseActorRef.apply$default$6") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.pattern.PromiseActorRef.this") \ No newline at end of file diff --git a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala index 3eaf85f406..ee5f466051 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala @@ -9,7 +9,6 @@ import java.util.concurrent.atomic.AtomicLong import scala.annotation.implicitNotFound import scala.concurrent.{ ExecutionContextExecutor, Future, Promise } import scala.util.control.NonFatal - import akka.ConfigurationException import akka.annotation.DoNotInherit import akka.annotation.InternalApi @@ -89,6 +88,11 @@ import akka.util.OptionVal */ def tempPath(): ActorPath + /** + * Generates and returns a unique actor path starting with `prefix` below “/temp”. + */ + def tempPath(prefix: String): ActorPath + /** * Returns the actor reference representing the “/temp” path. */ @@ -414,7 +418,17 @@ private[akka] class LocalActorRefProvider private[akka] ( private val tempNode = rootPath / "temp" - override def tempPath(): ActorPath = tempNode / Helpers.base64(tempNumber.getAndIncrement()) + override def tempPath(): ActorPath = tempPath("") + + override def tempPath(prefix: String): ActorPath = { + val builder = new java.lang.StringBuilder() + if (prefix.nonEmpty) { + builder.append(prefix) + } + builder.append("$") + Helpers.base64(tempNumber.getAndIncrement(), builder) + tempNode / builder.toString + } /** * Top-level anchor for the supervision hierarchy of this actor system. Will diff --git a/akka-actor/src/main/scala/akka/pattern/AskSupport.scala b/akka-actor/src/main/scala/akka/pattern/AskSupport.scala index c59427c22f..183f5aea5c 100644 --- a/akka-actor/src/main/scala/akka/pattern/AskSupport.scala +++ b/akka-actor/src/main/scala/akka/pattern/AskSupport.scala @@ -4,19 +4,19 @@ package akka.pattern +import java.net.URLEncoder import java.util.concurrent.TimeoutException import scala.annotation.tailrec import scala.concurrent.{ Future, Promise } import scala.language.implicitConversions import scala.util.{ Failure, Success } - import com.github.ghik.silencer.silent - import akka.actor._ import akka.annotation.{ InternalApi, InternalStableApi } import akka.dispatch.ExecutionContexts import akka.dispatch.sysmsg._ +import akka.util.ByteString import akka.util.{ Timeout, Unsafe } import akka.util.unused @@ -340,7 +340,7 @@ final class AskableActorRef(val actorRef: ActorRef) extends AnyVal { if (timeout.duration.length <= 0) Future.failed[Any](AskableActorRef.negativeTimeoutException(actorRef, message, sender)) else { - PromiseActorRef(ref.provider, timeout, targetName = actorRef, message.getClass.getName, sender) + PromiseActorRef(ref.provider, timeout, targetName = actorRef, message.getClass.getName, ref.path.name, sender) .ask(actorRef, message, timeout) } case _ => Future.failed[Any](AskableActorRef.unsupportedRecipientType(actorRef, message, sender)) @@ -373,7 +373,7 @@ final class ExplicitlyAskableActorRef(val actorRef: ActorRef) extends AnyVal { val message = messageFactory(ref.provider.deadLetters) Future.failed[Any](AskableActorRef.negativeTimeoutException(actorRef, message, sender)) } else { - val a = PromiseActorRef(ref.provider, timeout, targetName = actorRef, "unknown", sender) + val a = PromiseActorRef(ref.provider, timeout, targetName = actorRef, "unknown", ref.path.name, sender) val message = messageFactory(a) a.messageClassName = message.getClass.getName a.ask(actorRef, message, timeout) @@ -422,7 +422,8 @@ final class AskableActorSelection(val actorSel: ActorSelection) extends AnyVal { if (timeout.duration.length <= 0) Future.failed[Any](AskableActorRef.negativeTimeoutException(actorSel, message, sender)) else { - PromiseActorRef(ref.provider, timeout, targetName = actorSel, message.getClass.getName, sender) + val refPrefix = URLEncoder.encode(actorSel.pathString.replace("/", "_"), ByteString.UTF_8) + PromiseActorRef(ref.provider, timeout, targetName = actorSel, message.getClass.getName, refPrefix, sender) .ask(actorSel, message, timeout) } case _ => Future.failed[Any](AskableActorRef.unsupportedRecipientType(actorSel, message, sender)) @@ -450,7 +451,8 @@ final class ExplicitlyAskableActorSelection(val actorSel: ActorSelection) extend val message = messageFactory(ref.provider.deadLetters) Future.failed[Any](AskableActorRef.negativeTimeoutException(actorSel, message, sender)) } else { - val a = PromiseActorRef(ref.provider, timeout, targetName = actorSel, "unknown", sender) + val refPrefix = URLEncoder.encode(actorSel.pathString.replace("/", "_"), ByteString.UTF_8) + val a = PromiseActorRef(ref.provider, timeout, targetName = actorSel, "unknown", refPrefix, sender) val message = messageFactory(a) a.messageClassName = message.getClass.getName a.ask(actorSel, message, timeout) @@ -476,7 +478,8 @@ final class ExplicitlyAskableActorSelection(val actorSel: ActorSelection) extend private[akka] final class PromiseActorRef private ( val provider: ActorRefProvider, val result: Promise[Any], - _mcn: String) + _mcn: String, + refPathPrefix: String) extends MinimalActorRef { import AbstractPromiseActorRef.{ stateOffset, watchedByOffset } import PromiseActorRef._ @@ -553,7 +556,7 @@ private[akka] final class PromiseActorRef private ( if (updateState(null, Registering)) { var p: ActorPath = null try { - p = provider.tempPath() + p = provider.tempPath(refPathPrefix) provider.registerTempActor(this, p) p } finally { @@ -670,11 +673,14 @@ private[akka] object PromiseActorRef { timeout: Timeout, targetName: Any, messageClassName: String, + refPathPrefix: String, sender: ActorRef = Actor.noSender, onTimeout: String => Throwable = defaultOnTimeout): PromiseActorRef = { + if (refPathPrefix.indexOf('/') > -1) + throw new IllegalArgumentException(s"refPathPrefix must not contain slash, was: $refPathPrefix") val result = Promise[Any]() val scheduler = provider.guardian.underlying.system.scheduler - val a = new PromiseActorRef(provider, result, messageClassName) + val a = new PromiseActorRef(provider, result, messageClassName, refPathPrefix) implicit val ec = ExecutionContexts.parasitic val f = scheduler.scheduleOnce(timeout.duration) { val timedOut = result.tryComplete { diff --git a/akka-actor/src/main/scala/akka/pattern/GracefulStopSupport.scala b/akka-actor/src/main/scala/akka/pattern/GracefulStopSupport.scala index c1e50fce25..f9482efe9a 100644 --- a/akka-actor/src/main/scala/akka/pattern/GracefulStopSupport.scala +++ b/akka-actor/src/main/scala/akka/pattern/GracefulStopSupport.scala @@ -48,7 +48,8 @@ trait GracefulStopSupport { */ def gracefulStop(target: ActorRef, timeout: FiniteDuration, stopMessage: Any = PoisonPill): Future[Boolean] = { val internalTarget = target.asInstanceOf[InternalActorRef] - val ref = PromiseActorRef(internalTarget.provider, Timeout(timeout), target, stopMessage.getClass.getName) + val ref = + PromiseActorRef(internalTarget.provider, Timeout(timeout), target, stopMessage.getClass.getName, target.path.name) internalTarget.sendSystemMessage(Watch(internalTarget, ref)) target.tell(stopMessage, Actor.noSender) ref.result.future.transform({ diff --git a/akka-actor/src/main/scala/akka/pattern/PromiseRef.scala b/akka-actor/src/main/scala/akka/pattern/PromiseRef.scala index 65d6b75465..a2e056bc10 100644 --- a/akka-actor/src/main/scala/akka/pattern/PromiseRef.scala +++ b/akka-actor/src/main/scala/akka/pattern/PromiseRef.scala @@ -154,7 +154,8 @@ private[akka] final class AskPromiseRef private (promiseActorRef: PromiseActorRe private[akka] object AskPromiseRef { def apply(provider: ActorRefProvider, timeout: Timeout): AskPromiseRef = { if (timeout.duration.length > 0) { - val promiseActorRef = PromiseActorRef(provider, timeout, "unknown", "unknown", provider.deadLetters) + val promiseActorRef = + PromiseActorRef(provider, timeout, "unknown", "unknown", "deadLetters", provider.deadLetters) new AskPromiseRef(promiseActorRef) } else { throw new IllegalArgumentException(s"Timeout length must not be negative, was: $timeout") diff --git a/akka-cluster-sharding-typed/src/main/mima-filters/2.6.6.backwards.excludes/actor-path-in-ask-temp-name.excludes b/akka-cluster-sharding-typed/src/main/mima-filters/2.6.6.backwards.excludes/actor-path-in-ask-temp-name.excludes new file mode 100644 index 0000000000..de58572864 --- /dev/null +++ b/akka-cluster-sharding-typed/src/main/mima-filters/2.6.6.backwards.excludes/actor-path-in-ask-temp-name.excludes @@ -0,0 +1,2 @@ +# target actor name in the temporary ask actor name #29205, changes to ask internals +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.sharding.typed.internal.EntityRefImpl#EntityPromiseRef.this") \ No newline at end of file diff --git a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/internal/ClusterShardingImpl.scala b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/internal/ClusterShardingImpl.scala index dbf2aa39cd..df61d1b9e5 100644 --- a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/internal/ClusterShardingImpl.scala +++ b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/internal/ClusterShardingImpl.scala @@ -12,7 +12,6 @@ import java.util.concurrent.ConcurrentHashMap import scala.compat.java8.FutureConverters._ import scala.concurrent.Future - import akka.actor.ActorRefProvider import akka.actor.ExtendedActorSystem import akka.actor.InternalActorRef @@ -303,11 +302,13 @@ import akka.util.JavaDurationConverters._ with scaladsl.EntityRef[M] with InternalRecipientRef[M] { + override val refPrefix = URLEncoder.encode(s"${typeKey.name}-$entityId", ByteString.UTF_8) + override def tell(msg: M): Unit = shardRegion ! ShardingEnvelope(entityId, msg) override def ask[U](message: ActorRef[U] => M)(implicit timeout: Timeout): Future[U] = { - val replyTo = new EntityPromiseRef[U](shardRegion.asInstanceOf[InternalActorRef], timeout) + val replyTo = new EntityPromiseRef[U](shardRegion.asInstanceOf[InternalActorRef], timeout, refPrefix) val m = message(replyTo.ref) if (replyTo.promiseRef ne null) replyTo.promiseRef.messageClassName = m.getClass.getName replyTo.ask(shardRegion, entityId, m, timeout) @@ -318,7 +319,7 @@ import akka.util.JavaDurationConverters._ /** Similar to [[akka.actor.typed.scaladsl.AskPattern.PromiseRef]] but for an `EntityRef` target. */ @InternalApi - private final class EntityPromiseRef[U](classic: InternalActorRef, timeout: Timeout) { + private final class EntityPromiseRef[U](classic: InternalActorRef, timeout: Timeout, refPathPrefix: String) { import akka.actor.typed.internal.{ adapter => adapt } // Note: _promiseRef mustn't have a type pattern, since it can be null @@ -339,7 +340,12 @@ import akka.util.JavaDurationConverters._ else { // note that the real messageClassName will be set afterwards, replyTo pattern val a = - PromiseActorRef(classic.provider, timeout, targetName = EntityRefImpl.this, messageClassName = "unknown") + PromiseActorRef( + classic.provider, + timeout, + targetName = EntityRefImpl.this, + messageClassName = "unknown", + refPathPrefix) val b = adapt.ActorRefAdapter[U](a) (b, a.result.future.asInstanceOf[Future[U]], a) } @@ -372,7 +378,6 @@ import akka.util.JavaDurationConverters._ } override def toString: String = s"EntityRef($typeKey, $entityId)" - } /** diff --git a/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/scaladsl/ClusterShardingSpec.scala b/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/scaladsl/ClusterShardingSpec.scala index 9417b5b286..020911cb88 100644 --- a/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/scaladsl/ClusterShardingSpec.scala +++ b/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/scaladsl/ClusterShardingSpec.scala @@ -78,7 +78,7 @@ object ClusterShardingSpec { case (ctx, WhoAreYou(replyTo)) => val address = Cluster(ctx.system).selfMember.address - replyTo ! s"I'm ${ctx.self.path.name} at ${address.host.get}:${address.port.get}" + replyTo ! s"I'm ${ctx.self.path.name} at ${address.host.get}:${address.port.get} responding to $replyTo" Behaviors.same case (_, ReplyPlz(toMe)) => @@ -270,7 +270,10 @@ class ClusterShardingSpec val charlieRef = sharding.entityRefFor(typeKeyWithEnvelopes, "charlie") val reply1 = bobRef ? WhoAreYou // TODO document that WhoAreYou(_) would not work - reply1.futureValue should startWith("I'm bob") + val response = reply1.futureValue + response should startWith("I'm bob") + // typekey and entity id encoded in promise ref path + response should include(s"${typeKeyWithEnvelopes.name}-bob") val reply2 = charlieRef.ask(WhoAreYou) reply2.futureValue should startWith("I'm charlie") @@ -295,7 +298,10 @@ class ClusterShardingSpec } }) - p.receiveMessage().s should startWith("I'm alice") + val response = p.receiveMessage() + response.s should startWith("I'm alice") + // typekey and entity id encoded in promise ref path + response.s should include(s"${typeKeyWithEnvelopes.name}-alice") aliceRef ! StopPlz() diff --git a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala index 889c37a174..3e7b705eca 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala @@ -199,6 +199,7 @@ private[akka] class RemoteActorRefProvider( local.registerTempActor(actorRef, path) override def unregisterTempActor(path: ActorPath): Unit = local.unregisterTempActor(path) override def tempPath(): ActorPath = local.tempPath() + override def tempPath(prefix: String): ActorPath = local.tempPath(prefix) override def tempContainer: VirtualPathContainer = local.tempContainer @volatile private var _internals: Internals = _ diff --git a/akka-remote/src/main/scala/akka/remote/transport/ThrottlerTransportAdapter.scala b/akka-remote/src/main/scala/akka/remote/transport/ThrottlerTransportAdapter.scala index 8a41d7f264..ea074ba871 100644 --- a/akka-remote/src/main/scala/akka/remote/transport/ThrottlerTransportAdapter.scala +++ b/akka-remote/src/main/scala/akka/remote/transport/ThrottlerTransportAdapter.scala @@ -358,7 +358,8 @@ private[transport] class ThrottlerManager(wrappedTransport: Transport) if (target.isTerminated) Future.successful(SetThrottleAck) else { val internalTarget = target.asInstanceOf[InternalActorRef] - val ref = PromiseActorRef(internalTarget.provider, timeout, target, mode.getClass.getName) + val ref = + PromiseActorRef(internalTarget.provider, timeout, target, mode.getClass.getName, internalTarget.path.name) internalTarget.sendSystemMessage(Watch(internalTarget, ref)) target.tell(mode, ref) ref.result.future.transform({