Call BoundaryEvent cancellation hook for telemetry when ActorGraphInterpreter stops (#30259)
* Call BoundaryEvent cancellation hook for telemetry when ActorGraphInterpreter stops * Add missing annotation to ActorCell.sendMessage
This commit is contained in:
parent
3ae85e8cd0
commit
cbb12e6ef3
2 changed files with 13 additions and 1 deletions
|
|
@ -322,6 +322,7 @@ private[akka] trait Cell {
|
||||||
* schedule the actor to run, depending on which type of cell it is.
|
* schedule the actor to run, depending on which type of cell it is.
|
||||||
* Is only allowed to throw Fatal Throwables.
|
* Is only allowed to throw Fatal Throwables.
|
||||||
*/
|
*/
|
||||||
|
@InternalStableApi
|
||||||
final def sendMessage(message: Any, sender: ActorRef): Unit =
|
final def sendMessage(message: Any, sender: ActorRef): Unit =
|
||||||
sendMessage(Envelope(message, sender, system))
|
sendMessage(Envelope(message, sender, system))
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -829,7 +829,17 @@ import akka.util.OptionVal
|
||||||
newShells.map(shell => shell.toSnapshot.asInstanceOf[UninitializedInterpreter]))
|
newShells.map(shell => shell.toSnapshot.asInstanceOf[UninitializedInterpreter]))
|
||||||
}
|
}
|
||||||
|
|
||||||
override def postStop(): Unit =
|
override def postStop(): Unit = {
|
||||||
|
if (shortCircuitBuffer ne null) {
|
||||||
|
while (!shortCircuitBuffer.isEmpty) {
|
||||||
|
shortCircuitBuffer.poll() match {
|
||||||
|
case b: BoundaryEvent =>
|
||||||
|
// signal to telemetry that this event won't be processed
|
||||||
|
b.cancel()
|
||||||
|
case _ => // ignore
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
// avoid creating exception in happy case since it uses self.toString which is somewhat slow
|
// avoid creating exception in happy case since it uses self.toString which is somewhat slow
|
||||||
if (activeInterpreters.nonEmpty || newShells.nonEmpty) {
|
if (activeInterpreters.nonEmpty || newShells.nonEmpty) {
|
||||||
val ex = AbruptTerminationException(self)
|
val ex = AbruptTerminationException(self)
|
||||||
|
|
@ -837,4 +847,5 @@ import akka.util.OptionVal
|
||||||
activeInterpreters = Set.empty[GraphInterpreterShell]
|
activeInterpreters = Set.empty[GraphInterpreterShell]
|
||||||
newShells.foreach(s => if (tryInit(s)) s.tryAbort(ex))
|
newShells.foreach(s => if (tryInit(s)) s.tryAbort(ex))
|
||||||
}
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue