diff --git a/akka-actor/src/main/scala/akka/actor/ActorCell.scala b/akka-actor/src/main/scala/akka/actor/ActorCell.scala index 5b485d2d37..82168970cd 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorCell.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorCell.scala @@ -322,6 +322,7 @@ private[akka] trait Cell { * schedule the actor to run, depending on which type of cell it is. * Is only allowed to throw Fatal Throwables. */ + @InternalStableApi final def sendMessage(message: Any, sender: ActorRef): Unit = sendMessage(Envelope(message, sender, system)) diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorGraphInterpreter.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorGraphInterpreter.scala index 869315f5cd..b166e5d1db 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorGraphInterpreter.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorGraphInterpreter.scala @@ -829,7 +829,17 @@ import akka.util.OptionVal newShells.map(shell => shell.toSnapshot.asInstanceOf[UninitializedInterpreter])) } - override def postStop(): Unit = + override def postStop(): Unit = { + if (shortCircuitBuffer ne null) { + while (!shortCircuitBuffer.isEmpty) { + shortCircuitBuffer.poll() match { + case b: BoundaryEvent => + // signal to telemetry that this event won't be processed + b.cancel() + case _ => // ignore + } + } + } // avoid creating exception in happy case since it uses self.toString which is somewhat slow if (activeInterpreters.nonEmpty || newShells.nonEmpty) { val ex = AbruptTerminationException(self) @@ -837,4 +847,5 @@ import akka.util.OptionVal activeInterpreters = Set.empty[GraphInterpreterShell] newShells.foreach(s => if (tryInit(s)) s.tryAbort(ex)) } + } }