diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/AskPattern.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/AskPattern.scala index 60359de5e6..5894b194e1 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/AskPattern.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/AskPattern.scala @@ -17,9 +17,9 @@ import akka.actor.typed.RecipientRef import akka.actor.typed.Scheduler import akka.actor.typed.internal.{ adapter => adapt } import akka.actor.typed.internal.InternalRecipientRef -import akka.annotation.InternalApi +import akka.annotation.{ InternalApi, InternalStableApi } import akka.pattern.PromiseActorRef -import akka.util.Timeout +import akka.util.{ unused, Timeout } /** * The ask-pattern implements the initiator side of a request–reply protocol. @@ -146,14 +146,19 @@ object AskPattern { val ref: ActorRef[U] = _ref val future: Future[U] = _future val promiseRef: PromiseActorRef = _promiseRef + + @InternalStableApi + private[akka] def ask[T](target: InternalRecipientRef[T], message: T, @unused timeout: Timeout): Future[U] = { + target ! message + future + } } private def askClassic[T, U](target: InternalRecipientRef[T], timeout: Timeout, f: ActorRef[U] => T): Future[U] = { val p = new PromiseRef[U](target, timeout) val m = f(p.ref) if (p.promiseRef ne null) p.promiseRef.messageClassName = m.getClass.getName - target ! m - p.future + p.ask(target, m, timeout) } /** diff --git a/akka-actor/src/main/scala/akka/pattern/AskSupport.scala b/akka-actor/src/main/scala/akka/pattern/AskSupport.scala index 215d849b6e..c59427c22f 100644 --- a/akka-actor/src/main/scala/akka/pattern/AskSupport.scala +++ b/akka-actor/src/main/scala/akka/pattern/AskSupport.scala @@ -14,10 +14,11 @@ import scala.util.{ Failure, Success } import com.github.ghik.silencer.silent import akka.actor._ -import akka.annotation.InternalApi +import akka.annotation.{ InternalApi, InternalStableApi } import akka.dispatch.ExecutionContexts import akka.dispatch.sysmsg._ import akka.util.{ Timeout, Unsafe } +import akka.util.unused /** * This is what is used to complete a Future that is returned from an ask/? call, @@ -339,9 +340,8 @@ final class AskableActorRef(val actorRef: ActorRef) extends AnyVal { if (timeout.duration.length <= 0) Future.failed[Any](AskableActorRef.negativeTimeoutException(actorRef, message, sender)) else { - val a = PromiseActorRef(ref.provider, timeout, targetName = actorRef, message.getClass.getName, sender) - actorRef.tell(message, a) - a.result.future + PromiseActorRef(ref.provider, timeout, targetName = actorRef, message.getClass.getName, sender) + .ask(actorRef, message, timeout) } case _ => Future.failed[Any](AskableActorRef.unsupportedRecipientType(actorRef, message, sender)) } @@ -376,8 +376,7 @@ final class ExplicitlyAskableActorRef(val actorRef: ActorRef) extends AnyVal { val a = PromiseActorRef(ref.provider, timeout, targetName = actorRef, "unknown", sender) val message = messageFactory(a) a.messageClassName = message.getClass.getName - actorRef.tell(message, a) - a.result.future + a.ask(actorRef, message, timeout) } case _ if sender eq null => Future.failed[Any]( @@ -423,9 +422,8 @@ final class AskableActorSelection(val actorSel: ActorSelection) extends AnyVal { if (timeout.duration.length <= 0) Future.failed[Any](AskableActorRef.negativeTimeoutException(actorSel, message, sender)) else { - val a = PromiseActorRef(ref.provider, timeout, targetName = actorSel, message.getClass.getName, sender) - actorSel.tell(message, a) - a.result.future + PromiseActorRef(ref.provider, timeout, targetName = actorSel, message.getClass.getName, sender) + .ask(actorSel, message, timeout) } case _ => Future.failed[Any](AskableActorRef.unsupportedRecipientType(actorSel, message, sender)) } @@ -455,8 +453,7 @@ final class ExplicitlyAskableActorSelection(val actorSel: ActorSelection) extend val a = PromiseActorRef(ref.provider, timeout, targetName = actorSel, "unknown", sender) val message = messageFactory(a) a.messageClassName = message.getClass.getName - actorSel.tell(message, a) - a.result.future + a.ask(actorSel, message, timeout) } case _ if sender eq null => Future.failed[Any]( @@ -573,7 +570,9 @@ private[akka] final class PromiseActorRef private ( } override def !(message: Any)(implicit sender: ActorRef = Actor.noSender): Unit = state match { - case Stopped | _: StoppedWithPath => provider.deadLetters ! message + case Stopped | _: StoppedWithPath => + provider.deadLetters ! message + onComplete(message, alreadyCompleted = true) case _ => if (message == null) throw InvalidMessageException("Message is null") val promiseResult = message match { @@ -581,8 +580,10 @@ private[akka] final class PromiseActorRef private ( case Status.Failure(f) => Failure(f) case other => Success(other) } - if (!result.tryComplete(promiseResult)) + val alreadyCompleted = !result.tryComplete(promiseResult) + if (alreadyCompleted) provider.deadLetters ! message + onComplete(message, alreadyCompleted) } override def sendSystemMessage(message: SystemMessage): Unit = message match { @@ -632,6 +633,24 @@ private[akka] final class PromiseActorRef private ( case Registering => stop() // spin until registration is completed before stopping } } + + @InternalStableApi + private[akka] def ask(actorSel: ActorSelection, message: Any, @unused timeout: Timeout): Future[Any] = { + actorSel.tell(message, this) + result.future + } + + @InternalStableApi + private[akka] def ask(actorRef: ActorRef, message: Any, @unused timeout: Timeout): Future[Any] = { + actorRef.tell(message, this) + result.future + } + + @InternalStableApi + private[akka] def onComplete(@unused message: Any, @unused alreadyCompleted: Boolean): Unit = {} + + @InternalStableApi + private[akka] def onTimeout(@unused timeout: Timeout): Unit = {} } /** @@ -658,7 +677,7 @@ private[akka] object PromiseActorRef { val a = new PromiseActorRef(provider, result, messageClassName) implicit val ec = ExecutionContexts.parasitic val f = scheduler.scheduleOnce(timeout.duration) { - result.tryComplete { + val timedOut = result.tryComplete { val wasSentBy = if (sender == ActorRef.noSender) "" else s" was sent by [$sender]" val messagePart = s"Message of type [${a.messageClassName}]$wasSentBy." Failure( @@ -667,6 +686,9 @@ private[akka] object PromiseActorRef { messagePart + " A typical reason for `AskTimeoutException` is that the recipient actor didn't send a reply.")) } + if (timedOut) { + a.onTimeout(timeout) + } } result.future.onComplete { _ => try a.stop() diff --git a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/internal/ClusterShardingImpl.scala b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/internal/ClusterShardingImpl.scala index ff7a5f5cdd..dbf2aa39cd 100644 --- a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/internal/ClusterShardingImpl.scala +++ b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/internal/ClusterShardingImpl.scala @@ -27,7 +27,7 @@ import akka.actor.typed.internal.PoisonPillInterceptor import akka.actor.typed.internal.adapter.ActorRefAdapter import akka.actor.typed.internal.adapter.ActorSystemAdapter import akka.actor.typed.scaladsl.Behaviors -import akka.annotation.InternalApi +import akka.annotation.{ InternalApi, InternalStableApi } import akka.cluster.ClusterSettings.DataCenter import akka.cluster.sharding.ShardCoordinator.LeastShardAllocationStrategy import akka.cluster.sharding.ShardCoordinator.ShardAllocationStrategy @@ -40,9 +40,8 @@ import akka.event.LoggingAdapter import akka.japi.function.{ Function => JFunction } import akka.pattern.AskTimeoutException import akka.pattern.PromiseActorRef -import akka.util.ByteString +import akka.util.{ unused, ByteString, Timeout } import akka.util.JavaDurationConverters._ -import akka.util.Timeout /** * INTERNAL API @@ -311,8 +310,7 @@ import akka.util.Timeout val replyTo = new EntityPromiseRef[U](shardRegion.asInstanceOf[InternalActorRef], timeout) val m = message(replyTo.ref) if (replyTo.promiseRef ne null) replyTo.promiseRef.messageClassName = m.getClass.getName - shardRegion ! ShardingEnvelope(entityId, m) - replyTo.future + replyTo.ask(shardRegion, entityId, m, timeout) } def ask[U](message: JFunction[ActorRef[U], M], timeout: Duration): CompletionStage[U] = @@ -349,6 +347,16 @@ import akka.util.Timeout val ref: ActorRef[U] = _ref val future: Future[U] = _future val promiseRef: PromiseActorRef = _promiseRef + + @InternalStableApi + private[akka] def ask[T]( + shardRegion: akka.actor.ActorRef, + entityId: String, + message: T, + @unused timeout: Timeout): Future[U] = { + shardRegion ! ShardingEnvelope(entityId, message) + future + } } // impl InternalRecipientRef