Add BoundaryEvent cancellation hook for telemetry (#30246)

This commit is contained in:
Yury Gribkov 2021-05-20 12:10:44 -04:00 committed by GitHub
parent 657386488d
commit 49759617ab
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 31 additions and 0 deletions

View file

@ -0,0 +1,2 @@
# Changes to internal/private
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.impl.fusing.ActorGraphInterpreter#BoundaryEvent.cancel")

View file

@ -45,7 +45,12 @@ import akka.util.OptionVal
trait BoundaryEvent extends DeadLetterSuppression with NoSerializationVerificationNeeded {
def shell: GraphInterpreterShell
@InternalStableApi
def execute(eventLimit: Int): Int
@InternalStableApi
def cancel(): Unit
}
trait SimpleBoundaryEvent extends BoundaryEvent {
@ -81,6 +86,8 @@ import akka.util.OptionVal
}
override def logic: GraphStageLogic = BatchingActorInputBoundary.this
override def cancel(): Unit = ()
}
// can't be final because of SI-4440
case class OnComplete(shell: GraphInterpreterShell) extends SimpleBoundaryEvent {
@ -90,6 +97,8 @@ import akka.util.OptionVal
}
override def logic: GraphStageLogic = BatchingActorInputBoundary.this
override def cancel(): Unit = ()
}
// can't be final because of SI-4440
case class OnNext(shell: GraphInterpreterShell, e: Any) extends SimpleBoundaryEvent {
@ -99,6 +108,8 @@ import akka.util.OptionVal
}
override def logic: GraphStageLogic = BatchingActorInputBoundary.this
override def cancel(): Unit = ()
}
// can't be final because of SI-4440
case class OnSubscribe(shell: GraphInterpreterShell, subscription: Subscription) extends SimpleBoundaryEvent {
@ -109,6 +120,8 @@ import akka.util.OptionVal
}
override def logic: GraphStageLogic = BatchingActorInputBoundary.this
override def cancel(): Unit = ()
}
if (size <= 0) throw new IllegalArgumentException("buffer size cannot be zero")
@ -269,6 +282,8 @@ import akka.util.OptionVal
override def shell: GraphInterpreterShell = boundary.shell
override def logic: GraphStageLogic = boundary
override def cancel(): Unit = ()
}
final case class RequestMore(boundary: ActorOutputBoundary, demand: Long) extends SimpleBoundaryEvent {
@ -279,6 +294,7 @@ import akka.util.OptionVal
}
override def shell: GraphInterpreterShell = boundary.shell
override def logic: GraphStageLogic = boundary
override def cancel(): Unit = ()
}
final case class Cancel(boundary: ActorOutputBoundary, cause: Throwable) extends SimpleBoundaryEvent {
override def execute(): Unit = {
@ -290,6 +306,7 @@ import akka.util.OptionVal
override def shell: GraphInterpreterShell = boundary.shell
override def logic: GraphStageLogic = boundary
override def cancel(): Unit = ()
}
private[stream] class OutputBoundaryPublisher(boundary: ActorOutputBoundary, internalPortName: String)
@ -493,6 +510,8 @@ import akka.util.OptionVal
promise: Promise[Done],
handler: (Any) => Unit)
extends BoundaryEvent {
@InternalStableApi
override def execute(eventLimit: Int): Int = {
if (!waitingForShutdown) {
interpreter.runAsyncInput(logic, evt, promise, handler)
@ -504,6 +523,8 @@ import akka.util.OptionVal
eventLimit
}
}
override def cancel(): Unit = ()
}
// can't be final because of SI-4440
@ -513,6 +534,8 @@ import akka.util.OptionVal
if (GraphInterpreter.Debug) println(s"${interpreter.Name} resume")
if (interpreter.isSuspended) runBatch(eventLimit) else eventLimit
} else eventLimit
override def cancel(): Unit = ()
}
// can't be final because of SI-4440
@ -528,6 +551,8 @@ import akka.util.OptionVal
}
0
}
override def cancel(): Unit = ()
}
private var enqueueToShortCircuit: (Any) => Unit = _
@ -781,6 +806,9 @@ import akka.util.OptionVal
activeInterpreters -= shell
if (activeInterpreters.isEmpty && newShells.isEmpty) context.stop(self)
}
} else {
// signal to telemetry that this event won't be processed
b.cancel()
}
}

View file

@ -454,6 +454,7 @@ import akka.stream.stage._
eventsRemaining
}
@InternalStableApi
def runAsyncInput(logic: GraphStageLogic, evt: Any, promise: Promise[Done], handler: (Any) => Unit): Unit =
if (!isStageCompleted(logic)) {
if (GraphInterpreter.Debug) println(s"$Name ASYNC $evt ($handler) [$logic]")