Fix stack overflow for dead letters message adapter, #31241 (#31249)

This commit is contained in:
Patrik Nordwall 2022-03-17 09:03:33 +01:00 committed by GitHub
parent 22089bd5cf
commit f3a8c9dc4e
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 57 additions and 6 deletions

View file

@ -4,12 +4,17 @@
package akka.actor.typed.eventstream
import java.util.concurrent.CountDownLatch
import java.util.concurrent.TimeUnit
import scala.concurrent.duration._
import org.scalatest.wordspec.AnyWordSpecLike
import akka.actor.DeadLetter
import akka.actor.testkit.typed.scaladsl.{ ScalaTestWithActorTestKit, TestProbe }
import akka.actor.testkit.typed.scaladsl.LogCapturing
import akka.actor.typed.scaladsl.Behaviors
class EventStreamSpec extends ScalaTestWithActorTestKit with AnyWordSpecLike with LogCapturing {
@ -43,6 +48,56 @@ class EventStreamSpec extends ScalaTestWithActorTestKit with AnyWordSpecLike wit
testKit.system.eventStream ! Publish(EventObj)
eventObjListener.expectNoMessage(ShortWait)
}
"be subscribed by message adapter for DeadLetter" in {
val probe = createTestProbe[String]()
val ref = testKit.spawn(Behaviors.setup[String] { context =>
val adapter = context.messageAdapter[DeadLetter](d => d.message.toString)
context.system.eventStream ! Subscribe(adapter)
Behaviors.receiveMessage { msg =>
probe.ref ! msg
Behaviors.same
}
})
ref ! "init"
probe.expectMessage("init")
probe.expectNoMessage(100.millis) // might still be a risk that the eventStream ! Subscribe hasn't arrived
testKit.system.deadLetters ! "msg1"
testKit.system.deadLetters ! "msg2"
testKit.system.deadLetters ! "msg3"
probe.expectMessage("msg1")
probe.expectMessage("msg2")
probe.expectMessage("msg3")
}
"not cause infinite loop when stopping subscribed by message adapter for DeadLetter" in {
val latch = new CountDownLatch(1)
val ref = testKit.spawn(Behaviors.setup[String] { context =>
val adapter = context.messageAdapter[DeadLetter](d => d.message.toString)
context.system.eventStream ! Subscribe(adapter)
Behaviors.receiveMessage { msg =>
latch.await(10, TimeUnit.SECONDS)
if (msg == "stop")
Behaviors.stopped
else
Behaviors.same
}
})
ref ! "msg1"
ref ! "stop"
ref ! "msg2"
ref ! "msg3"
// msg2 and msg3 in mailbox, will be sent to dead letters
latch.countDown() // stop
testKit.createTestProbe().expectTerminated(ref)
}
}
"a system event stream subscriber" must {

View file

@ -240,10 +240,10 @@ private[akka] trait FaultHandling { this: ActorCell =>
try if (a ne null) a.aroundPostStop()
catch handleNonFatalOrInterruptedException { e =>
publish(Error(e, self.path.toString, clazz(a), e.getMessage))
} finally try dispatcher.detach(this)
} finally try stopFunctionRefs()
finally try dispatcher.detach(this)
finally try parent.sendSystemMessage(
DeathWatchNotification(self, existenceConfirmed = true, addressTerminated = false))
finally try stopFunctionRefs()
finally try tellWatchersWeDied()
finally try unwatchWatchedActors(a) // stay here as we expect an emergency stop from handleInvokeFailure
finally {

View file

@ -8,8 +8,6 @@ import java.util.concurrent.atomic.AtomicReference
import scala.annotation.tailrec
import scala.annotation.nowarn
import akka.actor.{ ActorRef, ActorSystem }
import akka.event.Logging.simpleName
import akka.util.Subclassification
@ -40,8 +38,6 @@ class EventStream(sys: ActorSystem, private val debug: Boolean) extends LoggingB
protected def classify(event: Any): Class[_] = event.getClass
// TODO consider avoiding the deprecated `isTerminated`?
@nowarn("msg=deprecated")
protected def publish(event: Any, subscriber: ActorRef) = {
if (sys == null && subscriber.isTerminated) unsubscribe(subscriber)
else subscriber ! event