Add necessary telemetry interceptors for Akka 2.6 AskPattern (#28580)

* telemetry interceptors for Akka Classic AskPattern
* telemetry interceptors for Akka Typed AskPattern
* Call onComplete even if the ask future is already completed to intercept late responses
* necessary telemetry interceptors for Akka sharded Typed Entity AskPattern
This commit is contained in:
Yury Gribkov 2020-05-05 11:13:21 -04:00 committed by GitHub
parent b52cda7b34
commit 0099ebc2b8
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 58 additions and 23 deletions

View file

@ -17,9 +17,9 @@ import akka.actor.typed.RecipientRef
import akka.actor.typed.Scheduler
import akka.actor.typed.internal.{ adapter => adapt }
import akka.actor.typed.internal.InternalRecipientRef
import akka.annotation.InternalApi
import akka.annotation.{ InternalApi, InternalStableApi }
import akka.pattern.PromiseActorRef
import akka.util.Timeout
import akka.util.{ unused, Timeout }
/**
* The ask-pattern implements the initiator side of a requestreply protocol.
@ -146,14 +146,19 @@ object AskPattern {
val ref: ActorRef[U] = _ref
val future: Future[U] = _future
val promiseRef: PromiseActorRef = _promiseRef
@InternalStableApi
private[akka] def ask[T](target: InternalRecipientRef[T], message: T, @unused timeout: Timeout): Future[U] = {
target ! message
future
}
}
private def askClassic[T, U](target: InternalRecipientRef[T], timeout: Timeout, f: ActorRef[U] => T): Future[U] = {
val p = new PromiseRef[U](target, timeout)
val m = f(p.ref)
if (p.promiseRef ne null) p.promiseRef.messageClassName = m.getClass.getName
target ! m
p.future
p.ask(target, m, timeout)
}
/**

View file

@ -14,10 +14,11 @@ import scala.util.{ Failure, Success }
import com.github.ghik.silencer.silent
import akka.actor._
import akka.annotation.InternalApi
import akka.annotation.{ InternalApi, InternalStableApi }
import akka.dispatch.ExecutionContexts
import akka.dispatch.sysmsg._
import akka.util.{ Timeout, Unsafe }
import akka.util.unused
/**
* This is what is used to complete a Future that is returned from an ask/? call,
@ -339,9 +340,8 @@ final class AskableActorRef(val actorRef: ActorRef) extends AnyVal {
if (timeout.duration.length <= 0)
Future.failed[Any](AskableActorRef.negativeTimeoutException(actorRef, message, sender))
else {
val a = PromiseActorRef(ref.provider, timeout, targetName = actorRef, message.getClass.getName, sender)
actorRef.tell(message, a)
a.result.future
PromiseActorRef(ref.provider, timeout, targetName = actorRef, message.getClass.getName, sender)
.ask(actorRef, message, timeout)
}
case _ => Future.failed[Any](AskableActorRef.unsupportedRecipientType(actorRef, message, sender))
}
@ -376,8 +376,7 @@ final class ExplicitlyAskableActorRef(val actorRef: ActorRef) extends AnyVal {
val a = PromiseActorRef(ref.provider, timeout, targetName = actorRef, "unknown", sender)
val message = messageFactory(a)
a.messageClassName = message.getClass.getName
actorRef.tell(message, a)
a.result.future
a.ask(actorRef, message, timeout)
}
case _ if sender eq null =>
Future.failed[Any](
@ -423,9 +422,8 @@ final class AskableActorSelection(val actorSel: ActorSelection) extends AnyVal {
if (timeout.duration.length <= 0)
Future.failed[Any](AskableActorRef.negativeTimeoutException(actorSel, message, sender))
else {
val a = PromiseActorRef(ref.provider, timeout, targetName = actorSel, message.getClass.getName, sender)
actorSel.tell(message, a)
a.result.future
PromiseActorRef(ref.provider, timeout, targetName = actorSel, message.getClass.getName, sender)
.ask(actorSel, message, timeout)
}
case _ => Future.failed[Any](AskableActorRef.unsupportedRecipientType(actorSel, message, sender))
}
@ -455,8 +453,7 @@ final class ExplicitlyAskableActorSelection(val actorSel: ActorSelection) extend
val a = PromiseActorRef(ref.provider, timeout, targetName = actorSel, "unknown", sender)
val message = messageFactory(a)
a.messageClassName = message.getClass.getName
actorSel.tell(message, a)
a.result.future
a.ask(actorSel, message, timeout)
}
case _ if sender eq null =>
Future.failed[Any](
@ -573,7 +570,9 @@ private[akka] final class PromiseActorRef private (
}
override def !(message: Any)(implicit sender: ActorRef = Actor.noSender): Unit = state match {
case Stopped | _: StoppedWithPath => provider.deadLetters ! message
case Stopped | _: StoppedWithPath =>
provider.deadLetters ! message
onComplete(message, alreadyCompleted = true)
case _ =>
if (message == null) throw InvalidMessageException("Message is null")
val promiseResult = message match {
@ -581,8 +580,10 @@ private[akka] final class PromiseActorRef private (
case Status.Failure(f) => Failure(f)
case other => Success(other)
}
if (!result.tryComplete(promiseResult))
val alreadyCompleted = !result.tryComplete(promiseResult)
if (alreadyCompleted)
provider.deadLetters ! message
onComplete(message, alreadyCompleted)
}
override def sendSystemMessage(message: SystemMessage): Unit = message match {
@ -632,6 +633,24 @@ private[akka] final class PromiseActorRef private (
case Registering => stop() // spin until registration is completed before stopping
}
}
@InternalStableApi
private[akka] def ask(actorSel: ActorSelection, message: Any, @unused timeout: Timeout): Future[Any] = {
actorSel.tell(message, this)
result.future
}
@InternalStableApi
private[akka] def ask(actorRef: ActorRef, message: Any, @unused timeout: Timeout): Future[Any] = {
actorRef.tell(message, this)
result.future
}
@InternalStableApi
private[akka] def onComplete(@unused message: Any, @unused alreadyCompleted: Boolean): Unit = {}
@InternalStableApi
private[akka] def onTimeout(@unused timeout: Timeout): Unit = {}
}
/**
@ -658,7 +677,7 @@ private[akka] object PromiseActorRef {
val a = new PromiseActorRef(provider, result, messageClassName)
implicit val ec = ExecutionContexts.parasitic
val f = scheduler.scheduleOnce(timeout.duration) {
result.tryComplete {
val timedOut = result.tryComplete {
val wasSentBy = if (sender == ActorRef.noSender) "" else s" was sent by [$sender]"
val messagePart = s"Message of type [${a.messageClassName}]$wasSentBy."
Failure(
@ -667,6 +686,9 @@ private[akka] object PromiseActorRef {
messagePart +
" A typical reason for `AskTimeoutException` is that the recipient actor didn't send a reply."))
}
if (timedOut) {
a.onTimeout(timeout)
}
}
result.future.onComplete { _ =>
try a.stop()

View file

@ -27,7 +27,7 @@ import akka.actor.typed.internal.PoisonPillInterceptor
import akka.actor.typed.internal.adapter.ActorRefAdapter
import akka.actor.typed.internal.adapter.ActorSystemAdapter
import akka.actor.typed.scaladsl.Behaviors
import akka.annotation.InternalApi
import akka.annotation.{ InternalApi, InternalStableApi }
import akka.cluster.ClusterSettings.DataCenter
import akka.cluster.sharding.ShardCoordinator.LeastShardAllocationStrategy
import akka.cluster.sharding.ShardCoordinator.ShardAllocationStrategy
@ -40,9 +40,8 @@ import akka.event.LoggingAdapter
import akka.japi.function.{ Function => JFunction }
import akka.pattern.AskTimeoutException
import akka.pattern.PromiseActorRef
import akka.util.ByteString
import akka.util.{ unused, ByteString, Timeout }
import akka.util.JavaDurationConverters._
import akka.util.Timeout
/**
* INTERNAL API
@ -311,8 +310,7 @@ import akka.util.Timeout
val replyTo = new EntityPromiseRef[U](shardRegion.asInstanceOf[InternalActorRef], timeout)
val m = message(replyTo.ref)
if (replyTo.promiseRef ne null) replyTo.promiseRef.messageClassName = m.getClass.getName
shardRegion ! ShardingEnvelope(entityId, m)
replyTo.future
replyTo.ask(shardRegion, entityId, m, timeout)
}
def ask[U](message: JFunction[ActorRef[U], M], timeout: Duration): CompletionStage[U] =
@ -349,6 +347,16 @@ import akka.util.Timeout
val ref: ActorRef[U] = _ref
val future: Future[U] = _future
val promiseRef: PromiseActorRef = _promiseRef
@InternalStableApi
private[akka] def ask[T](
shardRegion: akka.actor.ActorRef,
entityId: String,
message: T,
@unused timeout: Timeout): Future[U] = {
shardRegion ! ShardingEnvelope(entityId, message)
future
}
}
// impl InternalRecipientRef