Avoid non-tail recursion when stashed no-effect messages are processed (#29437)

* Avoid non-tail recursion when stashed no-effect messages are processed #29401

* MiMa filter

* revert PersistAll(empty) => none

Co-authored-by: Patrik Nordwall <patrik.nordwall@gmail.com>
This commit is contained in:
Johan Andrén 2020-08-04 13:54:05 +02:00 committed by GitHub
parent 327e16980d
commit 3181cbbbda
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 101 additions and 8 deletions

View file

@ -0,0 +1,2 @@
# Internals only
ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.persistence.typed.internal.Running#HandlingCommands.applyEffects")

View file

@ -125,7 +125,9 @@ private[akka] object Running {
def onCommand(state: RunningState[S], cmd: C): Behavior[InternalProtocol] = {
val effect = setup.commandHandler(state.state, cmd)
applyEffects(cmd, state, effect.asInstanceOf[EffectImpl[E, S]]) // TODO can we avoid the cast?
val (next, doUnstash) = applyEffects(cmd, state, effect.asInstanceOf[EffectImpl[E, S]]) // TODO can we avoid the cast?
if (doUnstash) tryUnstashOne(next)
else next
}
// Used by EventSourcedBehaviorTestKit to retrieve the state.
@ -138,7 +140,7 @@ private[akka] object Running {
msg: Any,
state: RunningState[S],
effect: Effect[E, S],
sideEffects: immutable.Seq[SideEffect[S]] = Nil): Behavior[InternalProtocol] = {
sideEffects: immutable.Seq[SideEffect[S]] = Nil): (Behavior[InternalProtocol], Boolean) = {
if (setup.log.isDebugEnabled && !effect.isInstanceOf[CompositeEffect[_, _]])
setup.log.debugN(
s"Handled command [{}], resulting effect: [{}], side effects: [{}]",
@ -165,7 +167,7 @@ private[akka] object Running {
val shouldSnapshotAfterPersist = setup.shouldSnapshot(newState2.state, event, newState2.seqNr)
persistingEvents(newState2, state, numberOfEvents = 1, shouldSnapshotAfterPersist, sideEffects)
(persistingEvents(newState2, state, numberOfEvents = 1, shouldSnapshotAfterPersist, sideEffects), false)
case PersistAll(events) =>
if (events.nonEmpty) {
@ -186,25 +188,25 @@ private[akka] object Running {
val newState2 = internalPersistAll(setup.context, msg, newState, eventsToPersist)
persistingEvents(newState2, state, events.size, shouldSnapshotAfterPersist, sideEffects)
(persistingEvents(newState2, state, events.size, shouldSnapshotAfterPersist, sideEffects), false)
} else {
// run side-effects even when no events are emitted
tryUnstashOne(applySideEffects(sideEffects, state))
(applySideEffects(sideEffects, state), true)
}
case _: PersistNothing.type =>
tryUnstashOne(applySideEffects(sideEffects, state))
(applySideEffects(sideEffects, state), true)
case _: Unhandled.type =>
import akka.actor.typed.scaladsl.adapter._
setup.context.system.toClassic.eventStream
.publish(UnhandledMessage(msg, setup.context.system.toClassic.deadLetters, setup.context.self.toClassic))
tryUnstashOne(applySideEffects(sideEffects, state))
(applySideEffects(sideEffects, state), true)
case _: Stash.type =>
stashUser(IncomingCommand(msg))
tryUnstashOne(applySideEffects(sideEffects, state))
(applySideEffects(sideEffects, state), true)
}
}

View file

@ -0,0 +1,89 @@
/*
* Copyright (C) 2020 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.persistence.typed.scaladsl
import akka.Done
import akka.actor.testkit.typed.scaladsl.LogCapturing
import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
import akka.actor.typed.ActorRef
import akka.actor.typed.Behavior
import akka.actor.typed.scaladsl.adapter._
import akka.persistence.journal.SteppingInmemJournal
import akka.persistence.typed.PersistenceId
import com.typesafe.config.ConfigFactory
import org.scalatest.wordspec.AnyWordSpecLike
import scala.concurrent.duration._
// Reproducer for #29401
object EventSourcedStashOverflowSpec {
object EventSourcedStringList {
trait Command
case class DoNothing(replyTo: ActorRef[Done]) extends Command
def apply(persistenceId: PersistenceId): Behavior[Command] =
EventSourcedBehavior[Command, String, List[String]](
persistenceId,
Nil, { (_, command) =>
command match {
case DoNothing(replyTo) =>
Effect.persist(List.empty[String]).thenRun(_ => replyTo ! Done)
}
}, { (state, event) =>
// original reproducer slept 2 seconds here but a pure application of an event seems unlikely to take that long
// so instead we delay recovery using a special journal
event :: state
})
}
def conf =
SteppingInmemJournal.config("EventSourcedStashOverflow").withFallback(ConfigFactory.parseString(s"""
akka.persistence {
typed {
stash-capacity = 1000 # enough to fail on stack size
stash-overflow-strategy = "drop"
}
}
"""))
}
class EventSourcedStashOverflowSpec
extends ScalaTestWithActorTestKit(EventSourcedStashOverflowSpec.conf)
with AnyWordSpecLike
with LogCapturing {
import EventSourcedStashOverflowSpec.EventSourcedStringList
"Stashing in a busy event sourced behavior" must {
"not cause stack overflow" in {
val es = spawn(EventSourcedStringList(PersistenceId.ofUniqueId("id-1")))
// wait for journal to start
val probe = testKit.createTestProbe[Done]()
probe.awaitAssert(SteppingInmemJournal.getRef("EventSourcedStashOverflow"), 3.seconds)
val journal = SteppingInmemJournal.getRef("EventSourcedStashOverflow")
val droppedMessageProbe = testKit.createDroppedMessageProbe()
val stashCapacity = testKit.config.getInt("akka.persistence.typed.stash-capacity")
for (_ <- 0 to (stashCapacity * 2)) {
es.tell(EventSourcedStringList.DoNothing(probe.ref))
}
// capacity + 1 should mean that we get a dropped last message when all stash is filled
// while the actor is stuck in replay because journal isn't responding
droppedMessageProbe.receiveMessage()
implicit val classicSystem = testKit.system.toClassic
// we only need to do this one step and recovery completes
SteppingInmemJournal.step(journal)
// exactly how many is racy but at least the first stash buffer full should complete
probe.receiveMessages(stashCapacity)
}
}
}