Fix typed persistence stack overflow with many read only commands (#1919)

* Update unstash stack overflow test to have it actually fail

Taken changes from https://github.com/apache/pekko/pull/1336 to have a test that fails

* Fix possible stack overflow in persistence-typed

This commit adds code to break recurrent calls in persistence-typed while unstashing read-only commands that could lead to a stack overflow, fixing issue #1327 (limited to EventSourcedBehavior)

The fix can be enabled using a feature flag, by default it is disabled

* bin compat exclude and scalafmt

* Also fix the same stack overflow issue in DurableStateBehavior

The fix is enabled by the same feature flag used by the fix of EventSourcedBehavior

* Enable by default the fix for the stack overflow

Also rename parameter

* Refactor code to make it more explicit that the old code path is unchanged

This commit changes how `onMessage` and `onCommand` are implemented to make it clearer that, when the `recurse-when-unstashing-read-only-commands` flag is set to true, the old code path is used.

Moreover, the while loop in onCommand has been changed into a tail recursive function

These changes have been applied to both EventSourcedBehavior and DurableStateBehavior

---------

Co-authored-by: PJ Fanning <pjfanning@users.noreply.github.com>
This commit is contained in:
Tomassino-ibm 2025-07-11 11:32:43 +02:00 committed by GitHub
parent ec7fdc7d0f
commit d89f2eec8b
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
13 changed files with 375 additions and 66 deletions

View file

@ -55,10 +55,12 @@ object EventSourcedStashOverflowSpec {
SteppingInmemJournal.config("EventSourcedStashOverflow").withFallback(ConfigFactory.parseString(s"""
pekko.persistence {
typed {
stash-capacity = 1000 # enough to fail on stack size
stash-capacity = 20000 # enough to fail on stack size
stash-overflow-strategy = "drop"
recurse-when-unstashing-read-only-commands = false
}
}
pekko.jvm-exit-on-fatal-error = off
"""))
}
@ -85,8 +87,8 @@ class EventSourcedStashOverflowSpec
for (_ <- 0 to (stashCapacity * 2)) {
es.tell(EventSourcedStringList.DoNothing(probe.ref))
}
// capacity + 1 should mean that we get a dropped last message when all stash is filled
// while the actor is stuck in replay because journal isn't responding
// capacity * 2 should mean that we get many dropped messages when all stash is filled
// while the actor is stuck in replay because journal isn't responding (checking only one)
droppedMessageProbe.receiveMessage()
implicit val classicSystem: pekko.actor.ActorSystem =
testKit.system.toClassic

View file

@ -0,0 +1,133 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* license agreements; and to You under the Apache License, version 2.0:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* This file is part of the Apache Pekko project, which was derived from Akka.
*/
/*
* Copyright (C) 2020-2022 Lightbend Inc. <https://www.lightbend.com>
*/
package org.apache.pekko.persistence.typed.state.scaladsl
import org.apache.pekko
import pekko.Done
import pekko.actor.testkit.typed.scaladsl.LogCapturing
import pekko.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
import pekko.actor.typed.ActorRef
import pekko.actor.typed.Behavior
import pekko.persistence.state.DurableStateStoreProvider
import pekko.persistence.state.scaladsl.{ DurableStateStore, DurableStateUpdateStore, GetObjectResult }
import pekko.persistence.state.javadsl.{ DurableStateStore => JDurableStateStore }
import pekko.persistence.typed.PersistenceId
import com.typesafe.config.ConfigFactory
import org.scalatest.wordspec.AnyWordSpecLike
import scala.concurrent.{ Future, Promise }
import scala.concurrent.duration._
object DurableStateBehaviorStashOverflowSpec {
class TestDurableStateStoreProvider extends DurableStateStoreProvider {
private val store = new TestDurableStateStorePlugin[Any]
override def scaladslDurableStateStore(): DurableStateStore[Any] = store
// Not used here
override def javadslDurableStateStore(): JDurableStateStore[AnyRef] = null
}
object TestDurableStateStorePlugin {
@volatile var instance: Option[TestDurableStateStorePlugin[_]] = None
def getInstance(): TestDurableStateStorePlugin[_] = instance.get
}
class TestDurableStateStorePlugin[A] extends DurableStateUpdateStore[A] {
TestDurableStateStorePlugin.instance = Some(this)
private val promise: Promise[Done] = Promise()
override def getObject(persistenceId: String): Future[GetObjectResult[A]] =
Future.successful(GetObjectResult[A](None, 0L))
override def upsertObject(persistenceId: String, revision: Long, value: A, tag: String): Future[Done] =
promise.future
def completeUpsertFuture(): Unit = promise.success(Done)
override def deleteObject(persistenceId: String): Future[Done] = Future.successful(Done)
override def deleteObject(persistenceId: String, revision: Long): Future[Done] = Future.successful(Done)
}
object DurableStateString {
sealed trait Command
case class DoNothing(replyTo: ActorRef[Done]) extends Command
case class DoPersist(replyTo: ActorRef[Done]) extends Command
def apply(persistenceId: PersistenceId): Behavior[Command] =
DurableStateBehavior[Command, String](
persistenceId,
"",
{ (_, command) =>
command match {
case DoNothing(replyTo) =>
Effect.reply(replyTo)(Done)
case DoPersist(replyTo) =>
Effect.persist("Initial persist").thenRun(_ => replyTo ! Done)
}
})
}
def conf = ConfigFactory.parseString(s"""
pekko.persistence {
state.plugin = "my-state-plugin"
typed {
stash-capacity = 20000 # enough to fail on stack size
stash-overflow-strategy = "drop"
recurse-when-unstashing-read-only-commands = false
}
}
pekko.jvm-exit-on-fatal-error = off
my-state-plugin.class = "${classOf[TestDurableStateStoreProvider].getName}"
""")
}
class DurableStateBehaviorStashOverflowSpec
extends ScalaTestWithActorTestKit(DurableStateBehaviorStashOverflowSpec.conf)
with AnyWordSpecLike
with LogCapturing {
import DurableStateBehaviorStashOverflowSpec._
"Stashing in a busy durable state behavior" must {
"not cause stack overflow" in {
val es = spawn(DurableStateString(PersistenceId.ofUniqueId("id-1")))
// wait for journal to start
val probe = testKit.createTestProbe[Done]()
val journal = probe.awaitAssert(TestDurableStateStorePlugin.getInstance(), 3.seconds)
val droppedMessageProbe = testKit.createDroppedMessageProbe()
val stashCapacity = testKit.config.getInt("pekko.persistence.typed.stash-capacity")
es.tell(DurableStateString.DoPersist(probe.ref))
for (_ <- 0 to (stashCapacity * 2)) {
es.tell(DurableStateString.DoNothing(probe.ref))
}
// capacity * 2 should mean that we get many dropped messages when all stash is filled
// while the actor is stuck in replay because journal isn't responding (checking only one)
droppedMessageProbe.receiveMessage()
journal.completeUpsertFuture()
// exactly how many is racy but at least the first stash buffer full should complete
probe.receiveMessages(stashCapacity)
}
}
}

View file

@ -0,0 +1,22 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# EventSourcedBehavior/DurableState Read-Only Events Stackoverflow
ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.persistence.typed.internal.Running#*")
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.persistence.typed.internal.Running#*")
ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.persistence.typed.state.internal.Running#*")
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.persistence.typed.state.internal.Running#*")

View file

@ -47,6 +47,13 @@ pekko.persistence.typed {
# this can be changed by setting this to 'true' in which case the internal logging is sent to
# the actor context logger.
use-context-logger-for-internal-logging = false
# Commands get stashed during persistence and un-stashed afterward. Normally, when many read-only
# (i.e. that do not cause any persistence) commands are present in the stash, the functions are called
# in a loop to unstash all of them. Setting this to true, will cause functions to be called
# recursively (that was the default before this setting was introduced). That might cause a stack
# overflow in case there are many messages to unstash.
recurse-when-unstashing-read-only-commands = false
}
pekko.reliable-delivery {

View file

@ -63,6 +63,9 @@ import pekko.annotation.InternalApi
val useContextLoggerForInternalLogging = typedConfig.getBoolean("use-context-logger-for-internal-logging")
val recurseWhenUnstashingReadOnlyCommands =
typedConfig.getBoolean("recurse-when-unstashing-read-only-commands")
EventSourcedSettings(
stashCapacity = stashCapacity,
stashOverflowStrategy,
@ -71,7 +74,8 @@ import pekko.annotation.InternalApi
snapshotPluginId,
journalPluginConfig,
snapshotPluginConfig,
useContextLoggerForInternalLogging)
useContextLoggerForInternalLogging,
recurseWhenUnstashingReadOnlyCommands)
}
}
@ -87,7 +91,8 @@ private[pekko] final case class EventSourcedSettings(
snapshotPluginId: String,
journalPluginConfig: Option[Config],
snapshotPluginConfig: Option[Config],
useContextLoggerForInternalLogging: Boolean) {
useContextLoggerForInternalLogging: Boolean,
recurseWhenUnstashingReadOnlyCommands: Boolean) {
require(journalPluginId != null, "journal plugin id must not be null; use empty string for 'default' journal")
require(

View file

@ -54,10 +54,10 @@ private[pekko] trait JournalInteractions[C, E, S] {
protected def internalPersist(
ctx: ActorContext[_],
cmd: Any,
state: Running.RunningState[S],
state: Running.RunningState[S, C],
event: EventOrTaggedOrReplicated,
eventAdapterManifest: String,
metadata: OptionVal[Any]): Running.RunningState[S] = {
metadata: OptionVal[Any]): Running.RunningState[S, C] = {
val newRunningState = state.nextSequenceNr()
@ -93,8 +93,8 @@ private[pekko] trait JournalInteractions[C, E, S] {
protected def internalPersistAll(
ctx: ActorContext[_],
cmd: Any,
state: Running.RunningState[S],
events: immutable.Seq[EventToPersist]): Running.RunningState[S] = {
state: Running.RunningState[S, C],
events: immutable.Seq[EventToPersist]): Running.RunningState[S, C] = {
if (events.nonEmpty) {
var newState = state
@ -202,7 +202,7 @@ private[pekko] trait SnapshotInteractions[C, E, S] {
setup.snapshotStore.tell(LoadSnapshot(setup.persistenceId.id, criteria, toSequenceNr), setup.selfClassic)
}
protected def internalSaveSnapshot(state: Running.RunningState[S]): Unit = {
protected def internalSaveSnapshot(state: Running.RunningState[S, C]): Unit = {
setup.internalLogger.debug("Saving snapshot sequenceNr [{}]", state.seqNr)
if (state.state == null)
throw new IllegalStateException("A snapshot must not be a null state.")

View file

@ -294,7 +294,7 @@ private[pekko] final class ReplayingEvents[C, E, S](
if (state.receivedPoisonPill && isInternalStashEmpty && !isUnstashAllInProgress)
Behaviors.stopped
else {
val runningState = Running.RunningState[S](
val runningState = Running.RunningState[S, C](
seqNr = state.seqNr,
state = state.state,
receivedPoisonPill = state.receivedPoisonPill,

View file

@ -97,21 +97,35 @@ private[pekko] object Running {
def currentSequenceNumber: Long
}
final case class RunningState[State](
// This is part of the fix for https://github.com/apache/pekko/issues/1327 and it's necessary to break
// recursion between the `onCommand` and `onMessage` functions in Running[C, E, S]#HandlingCommands. We
// set the flag `inOnCommandCall` to true in `onCommand` before calling functions that will in turn call
// `onMessage`. In `onMessage`, if the flag is set and we would recursively call `onCommand`, we instead
// save the parameters we would use to call `onCommand` and return. When back in `onCommand`, we check if
// `recOnCommandParams` is not empty and, if not, act as if `onMessage` called us directly. In function
// `onCommand` we have a while loop replacing recursive calls so that we don't use all the stack space in
// case a lot of read-only message are stashed
final class UnstashRecurrenceState[State, Command] {
var inOnCommandCall: Boolean = false
var recOnCommandParams: Option[(RunningState[State, Command], Command)] = None
}
final case class RunningState[State, Command](
seqNr: Long,
state: State,
receivedPoisonPill: Boolean,
version: VersionVector,
seenPerReplica: Map[ReplicaId, Long],
replicationControl: Map[ReplicaId, ReplicationStreamControl]) {
replicationControl: Map[ReplicaId, ReplicationStreamControl],
unstashRecurrenceState: UnstashRecurrenceState[State, Command] = new UnstashRecurrenceState[State, Command]) {
def nextSequenceNr(): RunningState[State] =
def nextSequenceNr(): RunningState[State, Command] =
copy(seqNr = seqNr + 1)
def updateLastSequenceNr(persistent: PersistentRepr): RunningState[State] =
def updateLastSequenceNr(persistent: PersistentRepr): RunningState[State, Command] =
if (persistent.sequenceNr > seqNr) copy(seqNr = persistent.sequenceNr) else this
def applyEvent[C, E](setup: BehaviorSetup[C, E, State], event: E): RunningState[State] = {
def applyEvent[E](setup: BehaviorSetup[Command, E, State], event: E): RunningState[State, Command] = {
val updated = setup.eventHandler(state, event)
copy(state = updated)
}
@ -119,8 +133,8 @@ private[pekko] object Running {
def startReplicationStream[C, E, S](
setup: BehaviorSetup[C, E, S],
state: RunningState[S],
replicationSetup: ReplicationSetup): RunningState[S] = {
state: RunningState[S, C],
replicationSetup: ReplicationSetup): RunningState[S, C] = {
import scala.concurrent.duration._
val system = setup.context.system
val ref = setup.context.self
@ -238,7 +252,7 @@ private[pekko] object Running {
// Needed for WithSeqNrAccessible, when unstashing
private var _currentSequenceNumber = 0L
final class HandlingCommands(state: RunningState[S])
final class HandlingCommands(state: RunningState[S, C])
extends AbstractBehavior[InternalProtocol](setup.context)
with WithSeqNrAccessible {
@ -249,7 +263,15 @@ private[pekko] object Running {
}
def onMessage(msg: InternalProtocol): Behavior[InternalProtocol] = msg match {
case IncomingCommand(c: C @unchecked) => onCommand(state, c)
case IncomingCommand(c: C @unchecked) if setup.settings.recurseWhenUnstashingReadOnlyCommands =>
onCommand(state, c)
case IncomingCommand(c: C @unchecked) =>
if (state.unstashRecurrenceState.inOnCommandCall) {
state.unstashRecurrenceState.recOnCommandParams = Some((state, c))
this // This will be ignored in onCommand
} else {
onCommand(state, c)
}
case re: ReplicatedEventEnvelope[E @unchecked] => onReplicatedEvent(state, re, setup.replication.get)
case pe: PublishedEventImpl => onPublishedEvent(state, pe)
case JournalResponse(r) => onDeleteEventsJournalResponse(r, state.state)
@ -268,15 +290,60 @@ private[pekko] object Running {
else Behaviors.unhandled
}
def onCommand(state: RunningState[S], cmd: C): Behavior[InternalProtocol] = {
val effect = setup.commandHandler(state.state, cmd)
val (next, doUnstash) = applyEffects(cmd, state, effect.asInstanceOf[EffectImpl[E, S]]) // TODO can we avoid the cast?
if (doUnstash) tryUnstashOne(next)
else next
def onCommand(state: RunningState[S, C], cmd: C): Behavior[InternalProtocol] = {
def callApplyEffects(rs: RunningState[S, C], c: C): (Behavior[InternalProtocol], Boolean) = {
val effect = setup.commandHandler(rs.state, c)
applyEffects(c, rs, effect.asInstanceOf[EffectImpl[E, S]]) // TODO can we avoid the cast?
}
def recursiveUnstashImpl(
applyEffectsRetval: (Behavior[InternalProtocol], Boolean)
): Behavior[InternalProtocol] = {
val (next, doUnstash) = applyEffectsRetval
if (doUnstash) tryUnstashOne(next)
else next
}
def nonRecursiveUnstashImpl(
applyEffectsRetval: (Behavior[InternalProtocol], Boolean)
): Behavior[InternalProtocol] = {
@tailrec
def loop(applyEffectsRetval: (Behavior[InternalProtocol], Boolean)): Behavior[InternalProtocol] = {
val (next, doUnstash) = applyEffectsRetval
if (doUnstash) {
state.unstashRecurrenceState.inOnCommandCall = true
val r = tryUnstashOne(next)
state.unstashRecurrenceState.inOnCommandCall = false
val recOnCommandParams = state.unstashRecurrenceState.recOnCommandParams
state.unstashRecurrenceState.recOnCommandParams = None
recOnCommandParams match {
case None => r
case Some((rs, c)) => loop(callApplyEffects(rs, c))
}
} else {
next
}
}
loop(applyEffectsRetval)
}
val r = callApplyEffects(state, cmd)
if (setup.settings.recurseWhenUnstashingReadOnlyCommands) {
recursiveUnstashImpl(r)
} else {
nonRecursiveUnstashImpl(r)
}
}
def onReplicatedEvent(
state: Running.RunningState[S],
state: Running.RunningState[S, C],
envelope: ReplicatedEventEnvelope[E],
replication: ReplicationSetup): Behavior[InternalProtocol] = {
setup.internalLogger.debugN(
@ -300,7 +367,7 @@ private[pekko] object Running {
}
}
def onPublishedEvent(state: Running.RunningState[S], event: PublishedEventImpl): Behavior[InternalProtocol] = {
def onPublishedEvent(state: Running.RunningState[S, C], event: PublishedEventImpl): Behavior[InternalProtocol] = {
val newBehavior: Behavior[InternalProtocol] = setup.replication match {
case None =>
setup.internalLogger.warn(
@ -321,7 +388,7 @@ private[pekko] object Running {
}
private def onPublishedEvent(
state: Running.RunningState[S],
state: Running.RunningState[S, C],
replication: ReplicationSetup,
replicatedMetadata: ReplicatedPublishedEventMetaData,
event: PublishedEventImpl): Behavior[InternalProtocol] = {
@ -428,7 +495,7 @@ private[pekko] object Running {
replication.clearContext()
val newState2: RunningState[S] = internalPersist(
val newState2: RunningState[S, C] = internalPersist(
setup.context,
null,
stateAfterApply,
@ -464,10 +531,10 @@ private[pekko] object Running {
val eventToPersist = adaptEvent(event)
val eventAdapterManifest = setup.eventAdapter.manifest(event)
val newState2 = setup.replication match {
val newState2: RunningState[S, C] = setup.replication match {
case Some(replication) =>
val updatedVersion = stateAfterApply.version.updated(replication.replicaId.id, _currentSequenceNumber)
val r = internalPersist(
val r: RunningState[S, C] = internalPersist(
setup.context,
cmd,
stateAfterApply,
@ -572,9 +639,10 @@ private[pekko] object Running {
(applySideEffects(sideEffects, state), true)
}
}
@tailrec def applyEffects(
msg: Any,
state: RunningState[S],
state: RunningState[S, C],
effect: Effect[E, S],
sideEffects: immutable.Seq[SideEffect[S]] = Nil): (Behavior[InternalProtocol], Boolean) = {
if (setup.internalLogger.isDebugEnabled && !effect.isInstanceOf[CompositeEffect[_, _]])
@ -630,8 +698,8 @@ private[pekko] object Running {
// ===============================================
def persistingEvents(
state: RunningState[S],
visibleState: RunningState[S], // previous state until write success
state: RunningState[S, C],
visibleState: RunningState[S, C], // previous state until write success
numberOfEvents: Int,
shouldSnapshotAfterPersist: SnapshotAfterPersist,
shouldPublish: Boolean,
@ -642,8 +710,8 @@ private[pekko] object Running {
/** INTERNAL API */
@InternalApi private[pekko] class PersistingEvents(
var state: RunningState[S],
var visibleState: RunningState[S], // previous state until write success
var state: RunningState[S, C],
var visibleState: RunningState[S, C], // previous state until write success
numberOfEvents: Int,
shouldSnapshotAfterPersist: SnapshotAfterPersist,
shouldPublish: Boolean,
@ -789,7 +857,7 @@ private[pekko] object Running {
/** INTERNAL API */
@InternalApi private[pekko] class StoringSnapshot(
state: RunningState[S],
state: RunningState[S, C],
sideEffects: immutable.Seq[SideEffect[S]],
snapshotReason: SnapshotAfterPersist)
extends AbstractBehavior[InternalProtocol](setup.context)
@ -886,7 +954,7 @@ private[pekko] object Running {
// --------------------------
def applySideEffects(effects: immutable.Seq[SideEffect[S]], state: RunningState[S]): Behavior[InternalProtocol] = {
def applySideEffects(effects: immutable.Seq[SideEffect[S]], state: RunningState[S, C]): Behavior[InternalProtocol] = {
var behavior: Behavior[InternalProtocol] = new HandlingCommands(state)
val it = effects.iterator
@ -905,7 +973,7 @@ private[pekko] object Running {
def applySideEffect(
effect: SideEffect[S],
state: RunningState[S],
state: RunningState[S, C],
behavior: Behavior[InternalProtocol]): Behavior[InternalProtocol] = {
effect match {
case _: Stop.type @unchecked =>

View file

@ -54,13 +54,17 @@ import pekko.persistence.Persistence
val useContextLoggerForInternalLogging = typedConfig.getBoolean("use-context-logger-for-internal-logging")
val recurseWhenUnstashingReadOnlyCommands =
typedConfig.getBoolean("recurse-when-unstashing-read-only-commands")
DurableStateSettings(
stashCapacity = stashCapacity,
stashOverflowStrategy,
logOnStashing = logOnStashing,
recoveryTimeout,
durableStateStorePluginId,
useContextLoggerForInternalLogging)
useContextLoggerForInternalLogging,
recurseWhenUnstashingReadOnlyCommands)
}
private def durableStateStoreConfigFor(config: Config, pluginId: String): Config = {
@ -87,7 +91,8 @@ private[pekko] final case class DurableStateSettings(
logOnStashing: Boolean,
recoveryTimeout: FiniteDuration,
durableStateStorePluginId: String,
useContextLoggerForInternalLogging: Boolean) {
useContextLoggerForInternalLogging: Boolean,
recurseWhenUnstashingReadOnlyCommands: Boolean) {
require(
durableStateStorePluginId != null,

View file

@ -49,8 +49,8 @@ private[pekko] trait DurableStateStoreInteractions[C, S] {
protected def internalUpsert(
ctx: ActorContext[InternalProtocol],
cmd: Any,
state: Running.RunningState[S],
value: Any): Running.RunningState[S] = {
state: Running.RunningState[S, C],
value: Any): Running.RunningState[S, C] = {
val newRunningState = state.nextRevision()
val persistenceId = setup.persistenceId.id
@ -69,9 +69,9 @@ private[pekko] trait DurableStateStoreInteractions[C, S] {
protected def internalDelete(
ctx: ActorContext[InternalProtocol],
cmd: Any,
state: Running.RunningState[S]): Running.RunningState[S] = {
state: Running.RunningState[S, C]): Running.RunningState[S, C] = {
val newRunningState = state.nextRevision().copy(state = setup.emptyState)
val newRunningState: Running.RunningState[S, C] = state.nextRevision().copy(state = setup.emptyState)
val persistenceId = setup.persistenceId.id
onDeleteInitiated(ctx, cmd)

View file

@ -183,7 +183,7 @@ private[pekko] class Recovering[C, S](
if (state.receivedPoisonPill && isInternalStashEmpty && !isUnstashAllInProgress)
Behaviors.stopped
else {
val runningState = Running.RunningState[S](
val runningState = Running.RunningState[S, C](
revision = state.revision,
state = state.state,
receivedPoisonPill = state.receivedPoisonPill)

View file

@ -55,12 +55,25 @@ private[pekko] object Running {
def currentRevision: Long
}
final case class RunningState[State](revision: Long, state: State, receivedPoisonPill: Boolean) {
// This is part of the fix for https://github.com/apache/pekko/issues/1327 and it's necessary to break
// recursion between the `onCommand` and `onMessage` functions in Running[C, E, S]#HandlingCommands.
// See comment in org.apache.pekko.persistence.typed.internal.Running#UnstashRecurrenceState for more
// information
final class UnstashRecurrenceState[State, Command] {
var inOnCommandCall: Boolean = false
var recOnCommandParams: Option[(RunningState[State, Command], Command)] = None
}
def nextRevision(): RunningState[State] =
final case class RunningState[State, Command](
revision: Long,
state: State,
receivedPoisonPill: Boolean,
unstashRecurrenceState: UnstashRecurrenceState[State, Command] = new UnstashRecurrenceState[State, Command]) {
def nextRevision(): RunningState[State, Command] =
copy(revision = revision + 1)
def applyState[C, E](@unused setup: BehaviorSetup[C, State], updated: State): RunningState[State] = {
def applyState(@unused setup: BehaviorSetup[Command, State], updated: State): RunningState[State, Command] = {
copy(state = updated)
}
}
@ -79,16 +92,24 @@ private[pekko] object Running {
// Needed for WithSeqNrAccessible, when unstashing
private var _currentRevision = 0L
final class HandlingCommands(state: RunningState[S])
final class HandlingCommands(state: RunningState[S, C])
extends AbstractBehavior[InternalProtocol](setup.context)
with WithRevisionAccessible {
_currentRevision = state.revision
def onMessage(msg: InternalProtocol): Behavior[InternalProtocol] = msg match {
case IncomingCommand(c: C @unchecked) => onCommand(state, c)
case get: GetState[S @unchecked] => onGetState(get)
case _ => Behaviors.unhandled
case IncomingCommand(c: C @unchecked) if setup.settings.recurseWhenUnstashingReadOnlyCommands =>
onCommand(state, c)
case IncomingCommand(c: C @unchecked) =>
if (state.unstashRecurrenceState.inOnCommandCall) {
state.unstashRecurrenceState.recOnCommandParams = Some((state, c))
this // This will be ignored in onCommand
} else {
onCommand(state, c)
}
case get: GetState[S @unchecked] => onGetState(get)
case _ => Behaviors.unhandled
}
override def onSignal: PartialFunction[Signal, Behavior[InternalProtocol]] = {
@ -100,11 +121,56 @@ private[pekko] object Running {
else Behaviors.unhandled
}
def onCommand(state: RunningState[S], cmd: C): Behavior[InternalProtocol] = {
val effect = setup.commandHandler(state.state, cmd)
val (next, doUnstash) = applyEffects(cmd, state, effect.asInstanceOf[EffectImpl[S]]) // TODO can we avoid the cast?
if (doUnstash) tryUnstashOne(next)
else next
def onCommand(state: RunningState[S, C], cmd: C): Behavior[InternalProtocol] = {
def callApplyEffects(rs: RunningState[S, C], c: C): (Behavior[InternalProtocol], Boolean) = {
val effect = setup.commandHandler(rs.state, c)
applyEffects(c, rs, effect.asInstanceOf[EffectImpl[S]]) // TODO can we avoid the cast?
}
def recursiveUnstashImpl(
applyEffectsRetval: (Behavior[InternalProtocol], Boolean)
): Behavior[InternalProtocol] = {
val (next, doUnstash) = applyEffectsRetval
if (doUnstash) tryUnstashOne(next)
else next
}
def nonRecursiveUnstashImpl(
applyEffectsRetval: (Behavior[InternalProtocol], Boolean)
): Behavior[InternalProtocol] = {
@tailrec
def loop(applyEffectsRetval: (Behavior[InternalProtocol], Boolean)): Behavior[InternalProtocol] = {
val (next, doUnstash) = applyEffectsRetval
if (doUnstash) {
state.unstashRecurrenceState.inOnCommandCall = true
val r = tryUnstashOne(next)
state.unstashRecurrenceState.inOnCommandCall = false
val recOnCommandParams = state.unstashRecurrenceState.recOnCommandParams
state.unstashRecurrenceState.recOnCommandParams = None
recOnCommandParams match {
case None => r
case Some((rs, c)) => loop(callApplyEffects(rs, c))
}
} else {
next
}
}
loop(applyEffectsRetval)
}
val r = callApplyEffects(state, cmd)
if (setup.settings.recurseWhenUnstashingReadOnlyCommands) {
recursiveUnstashImpl(r)
} else {
nonRecursiveUnstashImpl(r)
}
}
// Used by DurableStateBehaviorTestKit to retrieve the state.
@ -130,7 +196,7 @@ private[pekko] object Running {
@tailrec def applyEffects(
msg: Any,
state: RunningState[S],
state: RunningState[S, C],
effect: Effect[S],
sideEffects: immutable.Seq[SideEffect[S]] = Nil): (Behavior[InternalProtocol], Boolean) = {
if (setup.internalLogger.isDebugEnabled && !effect.isInstanceOf[CompositeEffect[_]])
@ -182,8 +248,8 @@ private[pekko] object Running {
// ===============================================
def persistingState(
state: RunningState[S],
visibleState: RunningState[S], // previous state until write success
state: RunningState[S, C],
visibleState: RunningState[S, C], // previous state until write success
sideEffects: immutable.Seq[SideEffect[S]]): Behavior[InternalProtocol] = {
setup.setMdcPhase(PersistenceMdc.PersistingState)
new PersistingState(state, visibleState, sideEffects)
@ -191,8 +257,8 @@ private[pekko] object Running {
/** INTERNAL API */
@InternalApi private[pekko] class PersistingState(
var state: RunningState[S],
var visibleState: RunningState[S], // previous state until write success
var state: RunningState[S, C],
var visibleState: RunningState[S, C], // previous state until write success
var sideEffects: immutable.Seq[SideEffect[S]],
persistStartTime: Long = System.nanoTime())
extends AbstractBehavior[InternalProtocol](setup.context)
@ -258,7 +324,7 @@ private[pekko] object Running {
// ===============================================
def applySideEffects(effects: immutable.Seq[SideEffect[S]], state: RunningState[S]): Behavior[InternalProtocol] = {
def applySideEffects(effects: immutable.Seq[SideEffect[S]], state: RunningState[S, C]): Behavior[InternalProtocol] = {
var behavior: Behavior[InternalProtocol] = new HandlingCommands(state)
val it = effects.iterator
@ -277,7 +343,7 @@ private[pekko] object Running {
def applySideEffect(
effect: SideEffect[S],
state: RunningState[S],
state: RunningState[S, C],
behavior: Behavior[InternalProtocol]): Behavior[InternalProtocol] = {
effect match {
case _: Stop.type @unchecked =>

View file

@ -78,6 +78,7 @@ class StashStateSpec extends ScalaTestWithActorTestKit with AnyWordSpecLike with
snapshotPluginId = "",
journalPluginConfig = None,
snapshotPluginConfig = None,
useContextLoggerForInternalLogging = false)
useContextLoggerForInternalLogging = false,
recurseWhenUnstashingReadOnlyCommands = false)
}