diff --git a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/eventstream/EventStreamSpec.scala b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/eventstream/EventStreamSpec.scala index 84f29984c7..70ab3cf535 100644 --- a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/eventstream/EventStreamSpec.scala +++ b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/eventstream/EventStreamSpec.scala @@ -4,12 +4,17 @@ package akka.actor.typed.eventstream +import java.util.concurrent.CountDownLatch +import java.util.concurrent.TimeUnit + import scala.concurrent.duration._ import org.scalatest.wordspec.AnyWordSpecLike +import akka.actor.DeadLetter import akka.actor.testkit.typed.scaladsl.{ ScalaTestWithActorTestKit, TestProbe } import akka.actor.testkit.typed.scaladsl.LogCapturing +import akka.actor.typed.scaladsl.Behaviors class EventStreamSpec extends ScalaTestWithActorTestKit with AnyWordSpecLike with LogCapturing { @@ -43,6 +48,56 @@ class EventStreamSpec extends ScalaTestWithActorTestKit with AnyWordSpecLike wit testKit.system.eventStream ! Publish(EventObj) eventObjListener.expectNoMessage(ShortWait) } + + "be subscribed by message adapter for DeadLetter" in { + val probe = createTestProbe[String]() + val ref = testKit.spawn(Behaviors.setup[String] { context => + val adapter = context.messageAdapter[DeadLetter](d => d.message.toString) + context.system.eventStream ! Subscribe(adapter) + + Behaviors.receiveMessage { msg => + probe.ref ! msg + Behaviors.same + } + }) + + ref ! "init" + probe.expectMessage("init") + probe.expectNoMessage(100.millis) // might still be a risk that the eventStream ! Subscribe hasn't arrived + + testKit.system.deadLetters ! "msg1" + testKit.system.deadLetters ! "msg2" + testKit.system.deadLetters ! "msg3" + + probe.expectMessage("msg1") + probe.expectMessage("msg2") + probe.expectMessage("msg3") + } + + "not cause infinite loop when stopping subscribed by message adapter for DeadLetter" in { + val latch = new CountDownLatch(1) + val ref = testKit.spawn(Behaviors.setup[String] { context => + val adapter = context.messageAdapter[DeadLetter](d => d.message.toString) + context.system.eventStream ! Subscribe(adapter) + + Behaviors.receiveMessage { msg => + latch.await(10, TimeUnit.SECONDS) + if (msg == "stop") + Behaviors.stopped + else + Behaviors.same + } + }) + + ref ! "msg1" + ref ! "stop" + ref ! "msg2" + ref ! "msg3" + // msg2 and msg3 in mailbox, will be sent to dead letters + latch.countDown() // stop + + testKit.createTestProbe().expectTerminated(ref) + } } "a system event stream subscriber" must { diff --git a/akka-actor/src/main/scala/akka/actor/dungeon/FaultHandling.scala b/akka-actor/src/main/scala/akka/actor/dungeon/FaultHandling.scala index 33b3896d63..2fc87234e2 100644 --- a/akka-actor/src/main/scala/akka/actor/dungeon/FaultHandling.scala +++ b/akka-actor/src/main/scala/akka/actor/dungeon/FaultHandling.scala @@ -240,10 +240,10 @@ private[akka] trait FaultHandling { this: ActorCell => try if (a ne null) a.aroundPostStop() catch handleNonFatalOrInterruptedException { e => publish(Error(e, self.path.toString, clazz(a), e.getMessage)) - } finally try dispatcher.detach(this) + } finally try stopFunctionRefs() + finally try dispatcher.detach(this) finally try parent.sendSystemMessage( DeathWatchNotification(self, existenceConfirmed = true, addressTerminated = false)) - finally try stopFunctionRefs() finally try tellWatchersWeDied() finally try unwatchWatchedActors(a) // stay here as we expect an emergency stop from handleInvokeFailure finally { diff --git a/akka-actor/src/main/scala/akka/event/EventStream.scala b/akka-actor/src/main/scala/akka/event/EventStream.scala index 6daed2e769..7c62e7eccc 100644 --- a/akka-actor/src/main/scala/akka/event/EventStream.scala +++ b/akka-actor/src/main/scala/akka/event/EventStream.scala @@ -8,8 +8,6 @@ import java.util.concurrent.atomic.AtomicReference import scala.annotation.tailrec -import scala.annotation.nowarn - import akka.actor.{ ActorRef, ActorSystem } import akka.event.Logging.simpleName import akka.util.Subclassification @@ -40,8 +38,6 @@ class EventStream(sys: ActorSystem, private val debug: Boolean) extends LoggingB protected def classify(event: Any): Class[_] = event.getClass - // TODO consider avoiding the deprecated `isTerminated`? - @nowarn("msg=deprecated") protected def publish(event: Any, subscriber: ActorRef) = { if (sys == null && subscriber.isTerminated) unsubscribe(subscriber) else subscriber ! event