diff --git a/akka-persistence-typed/src/main/mima-filters/2.6.8.backwards.excludes/issue-29401-stack-overflow.excludes b/akka-persistence-typed/src/main/mima-filters/2.6.8.backwards.excludes/issue-29401-stack-overflow.excludes new file mode 100644 index 0000000000..11be0f31c2 --- /dev/null +++ b/akka-persistence-typed/src/main/mima-filters/2.6.8.backwards.excludes/issue-29401-stack-overflow.excludes @@ -0,0 +1,2 @@ +# Internals only +ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.persistence.typed.internal.Running#HandlingCommands.applyEffects") \ No newline at end of file diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/Running.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/Running.scala index c1f4173801..e9456f9795 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/Running.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/Running.scala @@ -125,7 +125,9 @@ private[akka] object Running { def onCommand(state: RunningState[S], cmd: C): Behavior[InternalProtocol] = { val effect = setup.commandHandler(state.state, cmd) - applyEffects(cmd, state, effect.asInstanceOf[EffectImpl[E, S]]) // TODO can we avoid the cast? + val (next, doUnstash) = applyEffects(cmd, state, effect.asInstanceOf[EffectImpl[E, S]]) // TODO can we avoid the cast? + if (doUnstash) tryUnstashOne(next) + else next } // Used by EventSourcedBehaviorTestKit to retrieve the state. @@ -138,7 +140,7 @@ private[akka] object Running { msg: Any, state: RunningState[S], effect: Effect[E, S], - sideEffects: immutable.Seq[SideEffect[S]] = Nil): Behavior[InternalProtocol] = { + sideEffects: immutable.Seq[SideEffect[S]] = Nil): (Behavior[InternalProtocol], Boolean) = { if (setup.log.isDebugEnabled && !effect.isInstanceOf[CompositeEffect[_, _]]) setup.log.debugN( s"Handled command [{}], resulting effect: [{}], side effects: [{}]", @@ -165,7 +167,7 @@ private[akka] object Running { val shouldSnapshotAfterPersist = setup.shouldSnapshot(newState2.state, event, newState2.seqNr) - persistingEvents(newState2, state, numberOfEvents = 1, shouldSnapshotAfterPersist, sideEffects) + (persistingEvents(newState2, state, numberOfEvents = 1, shouldSnapshotAfterPersist, sideEffects), false) case PersistAll(events) => if (events.nonEmpty) { @@ -186,25 +188,25 @@ private[akka] object Running { val newState2 = internalPersistAll(setup.context, msg, newState, eventsToPersist) - persistingEvents(newState2, state, events.size, shouldSnapshotAfterPersist, sideEffects) + (persistingEvents(newState2, state, events.size, shouldSnapshotAfterPersist, sideEffects), false) } else { // run side-effects even when no events are emitted - tryUnstashOne(applySideEffects(sideEffects, state)) + (applySideEffects(sideEffects, state), true) } case _: PersistNothing.type => - tryUnstashOne(applySideEffects(sideEffects, state)) + (applySideEffects(sideEffects, state), true) case _: Unhandled.type => import akka.actor.typed.scaladsl.adapter._ setup.context.system.toClassic.eventStream .publish(UnhandledMessage(msg, setup.context.system.toClassic.deadLetters, setup.context.self.toClassic)) - tryUnstashOne(applySideEffects(sideEffects, state)) + (applySideEffects(sideEffects, state), true) case _: Stash.type => stashUser(IncomingCommand(msg)) - tryUnstashOne(applySideEffects(sideEffects, state)) + (applySideEffects(sideEffects, state), true) } } diff --git a/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/EventSourcedStashOverflowSpec.scala b/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/EventSourcedStashOverflowSpec.scala new file mode 100644 index 0000000000..8dc2ca18e9 --- /dev/null +++ b/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/EventSourcedStashOverflowSpec.scala @@ -0,0 +1,89 @@ +/* + * Copyright (C) 2020 Lightbend Inc. + */ + +package akka.persistence.typed.scaladsl + +import akka.Done +import akka.actor.testkit.typed.scaladsl.LogCapturing +import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit +import akka.actor.typed.ActorRef +import akka.actor.typed.Behavior +import akka.actor.typed.scaladsl.adapter._ +import akka.persistence.journal.SteppingInmemJournal +import akka.persistence.typed.PersistenceId +import com.typesafe.config.ConfigFactory +import org.scalatest.wordspec.AnyWordSpecLike + +import scala.concurrent.duration._ + +// Reproducer for #29401 +object EventSourcedStashOverflowSpec { + + object EventSourcedStringList { + trait Command + case class DoNothing(replyTo: ActorRef[Done]) extends Command + + def apply(persistenceId: PersistenceId): Behavior[Command] = + EventSourcedBehavior[Command, String, List[String]]( + persistenceId, + Nil, { (_, command) => + command match { + case DoNothing(replyTo) => + Effect.persist(List.empty[String]).thenRun(_ => replyTo ! Done) + } + }, { (state, event) => + // original reproducer slept 2 seconds here but a pure application of an event seems unlikely to take that long + // so instead we delay recovery using a special journal + event :: state + }) + } + + def conf = + SteppingInmemJournal.config("EventSourcedStashOverflow").withFallback(ConfigFactory.parseString(s""" + akka.persistence { + typed { + stash-capacity = 1000 # enough to fail on stack size + stash-overflow-strategy = "drop" + } + } + """)) +} + +class EventSourcedStashOverflowSpec + extends ScalaTestWithActorTestKit(EventSourcedStashOverflowSpec.conf) + with AnyWordSpecLike + with LogCapturing { + + import EventSourcedStashOverflowSpec.EventSourcedStringList + + "Stashing in a busy event sourced behavior" must { + + "not cause stack overflow" in { + val es = spawn(EventSourcedStringList(PersistenceId.ofUniqueId("id-1"))) + + // wait for journal to start + val probe = testKit.createTestProbe[Done]() + probe.awaitAssert(SteppingInmemJournal.getRef("EventSourcedStashOverflow"), 3.seconds) + val journal = SteppingInmemJournal.getRef("EventSourcedStashOverflow") + + val droppedMessageProbe = testKit.createDroppedMessageProbe() + val stashCapacity = testKit.config.getInt("akka.persistence.typed.stash-capacity") + + for (_ <- 0 to (stashCapacity * 2)) { + es.tell(EventSourcedStringList.DoNothing(probe.ref)) + } + // capacity + 1 should mean that we get a dropped last message when all stash is filled + // while the actor is stuck in replay because journal isn't responding + droppedMessageProbe.receiveMessage() + implicit val classicSystem = testKit.system.toClassic + // we only need to do this one step and recovery completes + SteppingInmemJournal.step(journal) + + // exactly how many is racy but at least the first stash buffer full should complete + probe.receiveMessages(stashCapacity) + } + + } + +}