Use snapshot-after in config for periodical snapshots in PersistentFSM (#22605)

* Use snapshot-after in config to periodical snapshot in PersistentFSM (#21563)

* akka.persistence.fsm.snapshot-after is either off or a numerical value

* Use Akka Extension for snapshot-after in PersistentFSM
This commit is contained in:
Richard Imaoka 2017-06-16 21:47:00 +09:00 committed by Patrik Nordwall
parent 3c8f6dacd8
commit 2ec2d98888
5 changed files with 173 additions and 2 deletions

View file

@ -790,6 +790,26 @@ A snapshot of state data can be persisted by calling the `saveStateSnapshot()` m
On recovery state data is initialized according to the latest available snapshot, then the remaining domain events are replayed, triggering the
`applyEvent` method.
<a id="periodical-snapshot"></a>
## Periodical snapshot by snapshot-after
You can enable periodical `saveStateSnapshot()` calls in `PersistentFSM` if you turn the following flag on in `reference.conf`
`akka.persistence.fsm.snapshot-after = 1000`
this means `saveStateSnapshot()` is called after the sequence number reaches multiple of 1000.
@@@ note
`saveStateSnapshot()` might not be called exactly at sequence numbers being multiple of the `snapshot-after` configuration value.
This is because `PersistentFSM` works in a sort of "batch" mode when processing and persisting events, and `saveStateSnapshot()`
is called only at the end of the "batch". For example, if you set `akka.persistence.fsm.snapshot-after = 1000`,
it is possible that `saveStateSnapshot()` is called at `lastSequenceNr = 1005, 2003, ... `
A single batch might persist state transition, also there could be multiple domain events to be persisted
if you pass them to `applying` method in the `PersistFSM` DSL.
@@@
## Storage plugins
Storage backends for journals and snapshot stores are pluggable in the Akka persistence extension.

View file

@ -798,6 +798,26 @@ A snapshot of state data can be persisted by calling the `saveStateSnapshot()` m
On recovery state data is initialized according to the latest available snapshot, then the remaining domain events are replayed, triggering the
`applyEvent` method.
<a id="periodical-snapshot"></a>
## Periodical snapshot by snapshot-after
You can enable periodical `saveStateSnapshot()` calls in `PersistentFSM` if you turn the following flag on in `reference.conf`
`akka.persistence.fsm.snapshot-after = 1000`
this means `saveStateSnapshot()` is called after the sequence number reaches multiple of 1000.
@@@ note
`saveStateSnapshot()` might not be called exactly at sequence numbers being multiple of the `snapshot-after` configuration value.
This is because `PersistentFSM` works in a sort of "batch" mode when processing and persisting events, and `saveStateSnapshot()`
is called only at the end of the "batch". For example, if you set `akka.persistence.fsm.snapshot-after = 1000`,
it is possible that `saveStateSnapshot()` is called at `lastSequenceNr = 1005, 2003, ... `
A single batch might persist state transition, also there could be multiple domain events to be persisted
if you pass them to `applying` method in the `PersistFSM` DSL.
@@@
<a id="storage-plugins"></a>
## Storage plugins

View file

@ -179,6 +179,15 @@ akka.persistence {
reset-timeout = 60s
}
}
fsm {
# PersistentFSM saves snapshots after this number of persistent
# events. Snapshots are used to reduce recovery times.
# When you disable this feature, specify snapshot-after = off.
# To enable the feature, specify a number like snapshot-after = 1000
# which means a snapshot is taken after persisting every 1000 events.
snapshot-after = off
}
}
# Protobuf serialization for the persistent extension messages.

View file

@ -9,12 +9,45 @@ import akka.annotation.InternalApi
import akka.persistence.fsm.PersistentFSM.FSMState
import akka.persistence.serialization.Message
import akka.persistence.{ PersistentActor, RecoveryCompleted, SnapshotOffer }
import com.typesafe.config.Config
import scala.annotation.varargs
import scala.collection.immutable
import scala.concurrent.duration._
import scala.reflect.ClassTag
/**
* SnapshotAfter Extension Id and factory for creating SnapshotAfter extension
*/
private[akka] object SnapshotAfter extends ExtensionId[SnapshotAfter] with ExtensionIdProvider {
override def get(system: ActorSystem): SnapshotAfter = super.get(system)
override def lookup = SnapshotAfter
override def createExtension(system: ExtendedActorSystem): SnapshotAfter = new SnapshotAfter(system.settings.config)
}
/**
* SnapshotAfter enables PersistentFSM to take periodical snapshot.
* See `akka.persistence.fsm.snapshot-after` for configuration options.
*/
private[akka] class SnapshotAfter(config: Config) extends Extension {
val key = "akka.persistence.fsm.snapshot-after"
val snapshotAfterValue = config.getString(key).toLowerCase match {
case "off" None
case _ Some(config.getInt(key))
}
/**
* Function that takes lastSequenceNr as the param, and returns whether the passed
* sequence number should trigger auto snapshot or not
*/
val isSnapshotAfterSeqNo: Long Boolean = snapshotAfterValue match {
case Some(snapShotAfterValue) seqNo: Long seqNo % snapShotAfterValue == 0
case None seqNo: Long false //always false, if snapshotAfter is not specified in config
}
}
/**
* A FSM implementation with persistent state.
*
@ -112,20 +145,29 @@ trait PersistentFSM[S <: FSMState, D, E] extends PersistentActor with Persistent
var nextData: D = stateData
var handlersExecutedCounter = 0
val snapshotAfterExtension = SnapshotAfter.get(context.system)
var doSnapshot: Boolean = false
def applyStateOnLastHandler() = {
handlersExecutedCounter += 1
if (handlersExecutedCounter == eventsToPersist.size) {
super.applyState(nextState using nextData)
currentStateTimeout = nextState.timeout
nextState.afterTransitionDo(stateData)
if (doSnapshot) {
log.info("Saving snapshot, sequence number [{}]", snapshotSequenceNr)
saveStateSnapshot()
}
}
}
persistAll[Any](eventsToPersist) {
case domainEventTag(event)
nextData = applyEvent(event, nextData)
doSnapshot = doSnapshot || snapshotAfterExtension.isSnapshotAfterSeqNo(lastSequenceNr)
applyStateOnLastHandler()
case StateChangeEvent(stateIdentifier, timeout)
doSnapshot = doSnapshot || snapshotAfterExtension.isSnapshotAfterSeqNo(lastSequenceNr)
applyStateOnLastHandler()
}
}

View file

@ -4,11 +4,15 @@
package akka.persistence.fsm
import akka.actor._
import java.io.File
import akka.actor.{ ActorSystem, _ }
import akka.persistence._
import akka.persistence.fsm.PersistentFSM._
import akka.persistence.fsm.PersistentFSMSpec.IntAdded
import akka.testkit._
import com.typesafe.config.Config
import com.typesafe.config.{ Config, ConfigFactory }
import org.apache.commons.io.FileUtils
import scala.concurrent.duration._
import scala.language.postfixOps
@ -343,6 +347,41 @@ abstract class PersistentFSMSpec(config: Config) extends PersistenceSpec(config)
}
"save periodical snapshots if akka.persistence.fsm.enable-snapshot-after = on" in {
val sys2 = ActorSystem(
"PersistentFsmSpec2",
ConfigFactory
.parseString("""
akka.persistence.fsm.enable-snapshot-after = on
akka.persistence.fsm.snapshot-after = 3
""").withFallback(PersistenceSpec.config("leveldb", "PersistentFSMSpec2")))
try {
val probe = TestProbe()
val fsmRef = sys2.actorOf(SnapshotFSM.props(probe.ref))
fsmRef ! 1
fsmRef ! 2
fsmRef ! 3
// Needs to wait with expectMsg, before sending the next message to fsmRef.
// Otherwise, stateData sent to this probe is already updated
probe.expectMsg("SeqNo=3, StateData=List(3, 2, 1)")
fsmRef ! "4x" //changes the state to Persist4xAtOnce, also updates SeqNo although nothing is persisted
fsmRef ! 10 //Persist4xAtOnce = persist 10, 4x times
// snapshot-after = 3, but the SeqNo is not multiple of 3,
// as saveStateSnapshot() is called at the end of persistent event "batch" = 4x of 10's.
probe.expectMsg("SeqNo=8, StateData=List(10, 10, 10, 10, 3, 2, 1)")
} finally {
val storageLocations = List(
"akka.persistence.journal.leveldb.dir",
"akka.persistence.journal.leveldb-shared.store.dir",
"akka.persistence.snapshot-store.local.dir").map(s new File(sys2.settings.config.getString(s)))
shutdown(sys2)
storageLocations.foreach(FileUtils.deleteDirectory)
}
}
}
}
@ -547,6 +586,47 @@ object PersistentFSMSpec {
}
}
sealed trait SnapshotFSMState extends PersistentFSM.FSMState
case object PersistSingleAtOnce extends SnapshotFSMState { override def identifier: String = "PersistSingleAtOnce" }
case object Persist4xAtOnce extends SnapshotFSMState { override def identifier: String = "Persist4xAtOnce" }
sealed trait SnapshotFSMEvent
case class IntAdded(i: Int) extends SnapshotFSMEvent
object SnapshotFSM {
def props(probe: ActorRef) = Props(new SnapshotFSM(probe))
}
class SnapshotFSM(probe: ActorRef)(implicit val domainEventClassTag: ClassTag[SnapshotFSMEvent])
extends Actor with PersistentFSM[SnapshotFSMState, List[Int], SnapshotFSMEvent] {
override def persistenceId: String = "snapshot-fsm-test"
override def applyEvent(event: SnapshotFSMEvent, currentData: List[Int]): List[Int] = event match {
case IntAdded(i) i :: currentData
}
startWith(PersistSingleAtOnce, Nil)
when(PersistSingleAtOnce) {
case Event(i: Int, _)
stay applying IntAdded(i)
case Event("4x", _)
goto(Persist4xAtOnce)
case Event(SaveSnapshotSuccess(metadata), _)
probe ! s"SeqNo=${metadata.sequenceNr}, StateData=${stateData}"
stay()
}
when(Persist4xAtOnce) {
case Event(i: Int, _)
stay applying (IntAdded(i), IntAdded(i), IntAdded(i), IntAdded(i))
case Event(SaveSnapshotSuccess(metadata), _)
probe ! s"SeqNo=${metadata.sequenceNr}, StateData=${stateData}"
stay()
}
}
}
class LeveldbPersistentFSMSpec extends PersistentFSMSpec(PersistenceSpec.config("leveldb", "PersistentFSMSpec"))