diff --git a/akka-stream/src/main/mima-filters/2.6.14.backwards.excludes/30246-boundary-event-cancellation-hook.excludes b/akka-stream/src/main/mima-filters/2.6.14.backwards.excludes/30246-boundary-event-cancellation-hook.excludes new file mode 100644 index 0000000000..31a8f7eeca --- /dev/null +++ b/akka-stream/src/main/mima-filters/2.6.14.backwards.excludes/30246-boundary-event-cancellation-hook.excludes @@ -0,0 +1,2 @@ +# Changes to internal/private +ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.impl.fusing.ActorGraphInterpreter#BoundaryEvent.cancel") \ No newline at end of file diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorGraphInterpreter.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorGraphInterpreter.scala index c542e1ab61..869315f5cd 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorGraphInterpreter.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorGraphInterpreter.scala @@ -45,7 +45,12 @@ import akka.util.OptionVal trait BoundaryEvent extends DeadLetterSuppression with NoSerializationVerificationNeeded { def shell: GraphInterpreterShell + + @InternalStableApi def execute(eventLimit: Int): Int + + @InternalStableApi + def cancel(): Unit } trait SimpleBoundaryEvent extends BoundaryEvent { @@ -81,6 +86,8 @@ import akka.util.OptionVal } override def logic: GraphStageLogic = BatchingActorInputBoundary.this + + override def cancel(): Unit = () } // can't be final because of SI-4440 case class OnComplete(shell: GraphInterpreterShell) extends SimpleBoundaryEvent { @@ -90,6 +97,8 @@ import akka.util.OptionVal } override def logic: GraphStageLogic = BatchingActorInputBoundary.this + + override def cancel(): Unit = () } // can't be final because of SI-4440 case class OnNext(shell: GraphInterpreterShell, e: Any) extends SimpleBoundaryEvent { @@ -99,6 +108,8 @@ import akka.util.OptionVal } override def logic: GraphStageLogic = BatchingActorInputBoundary.this + + override def cancel(): Unit = () } // can't be final because of SI-4440 case class OnSubscribe(shell: GraphInterpreterShell, subscription: Subscription) extends SimpleBoundaryEvent { @@ -109,6 +120,8 @@ import akka.util.OptionVal } override def logic: GraphStageLogic = BatchingActorInputBoundary.this + + override def cancel(): Unit = () } if (size <= 0) throw new IllegalArgumentException("buffer size cannot be zero") @@ -269,6 +282,8 @@ import akka.util.OptionVal override def shell: GraphInterpreterShell = boundary.shell override def logic: GraphStageLogic = boundary + + override def cancel(): Unit = () } final case class RequestMore(boundary: ActorOutputBoundary, demand: Long) extends SimpleBoundaryEvent { @@ -279,6 +294,7 @@ import akka.util.OptionVal } override def shell: GraphInterpreterShell = boundary.shell override def logic: GraphStageLogic = boundary + override def cancel(): Unit = () } final case class Cancel(boundary: ActorOutputBoundary, cause: Throwable) extends SimpleBoundaryEvent { override def execute(): Unit = { @@ -290,6 +306,7 @@ import akka.util.OptionVal override def shell: GraphInterpreterShell = boundary.shell override def logic: GraphStageLogic = boundary + override def cancel(): Unit = () } private[stream] class OutputBoundaryPublisher(boundary: ActorOutputBoundary, internalPortName: String) @@ -493,6 +510,8 @@ import akka.util.OptionVal promise: Promise[Done], handler: (Any) => Unit) extends BoundaryEvent { + + @InternalStableApi override def execute(eventLimit: Int): Int = { if (!waitingForShutdown) { interpreter.runAsyncInput(logic, evt, promise, handler) @@ -504,6 +523,8 @@ import akka.util.OptionVal eventLimit } } + + override def cancel(): Unit = () } // can't be final because of SI-4440 @@ -513,6 +534,8 @@ import akka.util.OptionVal if (GraphInterpreter.Debug) println(s"${interpreter.Name} resume") if (interpreter.isSuspended) runBatch(eventLimit) else eventLimit } else eventLimit + + override def cancel(): Unit = () } // can't be final because of SI-4440 @@ -528,6 +551,8 @@ import akka.util.OptionVal } 0 } + + override def cancel(): Unit = () } private var enqueueToShortCircuit: (Any) => Unit = _ @@ -781,6 +806,9 @@ import akka.util.OptionVal activeInterpreters -= shell if (activeInterpreters.isEmpty && newShells.isEmpty) context.stop(self) } + } else { + // signal to telemetry that this event won't be processed + b.cancel() } } diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphInterpreter.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphInterpreter.scala index 80d9307e53..0263a3014c 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphInterpreter.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphInterpreter.scala @@ -454,6 +454,7 @@ import akka.stream.stage._ eventsRemaining } + @InternalStableApi def runAsyncInput(logic: GraphStageLogic, evt: Any, promise: Promise[Done], handler: (Any) => Unit): Unit = if (!isStageCompleted(logic)) { if (GraphInterpreter.Debug) println(s"$Name ASYNC $evt ($handler) [$logic]")