From 2ec2d9888850a3224e2c0ef8f4a59aac9654d3f9 Mon Sep 17 00:00:00 2001 From: Richard Imaoka Date: Fri, 16 Jun 2017 21:47:00 +0900 Subject: [PATCH] Use snapshot-after in config for periodical snapshots in PersistentFSM (#22605) * Use snapshot-after in config to periodical snapshot in PersistentFSM (#21563) * akka.persistence.fsm.snapshot-after is either off or a numerical value * Use Akka Extension for snapshot-after in PersistentFSM --- .../src/main/paradox/java/persistence.md | 20 +++++ .../src/main/paradox/scala/persistence.md | 20 +++++ .../src/main/resources/reference.conf | 9 ++ .../akka/persistence/fsm/PersistentFSM.scala | 42 ++++++++++ .../persistence/fsm/PersistentFSMSpec.scala | 84 ++++++++++++++++++- 5 files changed, 173 insertions(+), 2 deletions(-) diff --git a/akka-docs/src/main/paradox/java/persistence.md b/akka-docs/src/main/paradox/java/persistence.md index 8427e1aefb..34763ef5b8 100644 --- a/akka-docs/src/main/paradox/java/persistence.md +++ b/akka-docs/src/main/paradox/java/persistence.md @@ -790,6 +790,26 @@ A snapshot of state data can be persisted by calling the `saveStateSnapshot()` m On recovery state data is initialized according to the latest available snapshot, then the remaining domain events are replayed, triggering the `applyEvent` method. + +## Periodical snapshot by snapshot-after + +You can enable periodical `saveStateSnapshot()` calls in `PersistentFSM` if you turn the following flag on in `reference.conf` + +`akka.persistence.fsm.snapshot-after = 1000` + +this means `saveStateSnapshot()` is called after the sequence number reaches multiple of 1000. + +@@@ note + +`saveStateSnapshot()` might not be called exactly at sequence numbers being multiple of the `snapshot-after` configuration value. +This is because `PersistentFSM` works in a sort of "batch" mode when processing and persisting events, and `saveStateSnapshot()` +is called only at the end of the "batch". For example, if you set `akka.persistence.fsm.snapshot-after = 1000`, +it is possible that `saveStateSnapshot()` is called at `lastSequenceNr = 1005, 2003, ... ` +A single batch might persist state transition, also there could be multiple domain events to be persisted +if you pass them to `applying` method in the `PersistFSM` DSL. + +@@@ + ## Storage plugins Storage backends for journals and snapshot stores are pluggable in the Akka persistence extension. diff --git a/akka-docs/src/main/paradox/scala/persistence.md b/akka-docs/src/main/paradox/scala/persistence.md index 472104e349..a9c63bb891 100644 --- a/akka-docs/src/main/paradox/scala/persistence.md +++ b/akka-docs/src/main/paradox/scala/persistence.md @@ -798,6 +798,26 @@ A snapshot of state data can be persisted by calling the `saveStateSnapshot()` m On recovery state data is initialized according to the latest available snapshot, then the remaining domain events are replayed, triggering the `applyEvent` method. + +## Periodical snapshot by snapshot-after + +You can enable periodical `saveStateSnapshot()` calls in `PersistentFSM` if you turn the following flag on in `reference.conf` + +`akka.persistence.fsm.snapshot-after = 1000` + +this means `saveStateSnapshot()` is called after the sequence number reaches multiple of 1000. + +@@@ note + +`saveStateSnapshot()` might not be called exactly at sequence numbers being multiple of the `snapshot-after` configuration value. +This is because `PersistentFSM` works in a sort of "batch" mode when processing and persisting events, and `saveStateSnapshot()` +is called only at the end of the "batch". For example, if you set `akka.persistence.fsm.snapshot-after = 1000`, +it is possible that `saveStateSnapshot()` is called at `lastSequenceNr = 1005, 2003, ... ` +A single batch might persist state transition, also there could be multiple domain events to be persisted +if you pass them to `applying` method in the `PersistFSM` DSL. + +@@@ + ## Storage plugins diff --git a/akka-persistence/src/main/resources/reference.conf b/akka-persistence/src/main/resources/reference.conf index 176329368b..c867fc5034 100644 --- a/akka-persistence/src/main/resources/reference.conf +++ b/akka-persistence/src/main/resources/reference.conf @@ -179,6 +179,15 @@ akka.persistence { reset-timeout = 60s } } + + fsm { + # PersistentFSM saves snapshots after this number of persistent + # events. Snapshots are used to reduce recovery times. + # When you disable this feature, specify snapshot-after = off. + # To enable the feature, specify a number like snapshot-after = 1000 + # which means a snapshot is taken after persisting every 1000 events. + snapshot-after = off + } } # Protobuf serialization for the persistent extension messages. diff --git a/akka-persistence/src/main/scala/akka/persistence/fsm/PersistentFSM.scala b/akka-persistence/src/main/scala/akka/persistence/fsm/PersistentFSM.scala index 4046687f74..ca9be50ef0 100644 --- a/akka-persistence/src/main/scala/akka/persistence/fsm/PersistentFSM.scala +++ b/akka-persistence/src/main/scala/akka/persistence/fsm/PersistentFSM.scala @@ -9,12 +9,45 @@ import akka.annotation.InternalApi import akka.persistence.fsm.PersistentFSM.FSMState import akka.persistence.serialization.Message import akka.persistence.{ PersistentActor, RecoveryCompleted, SnapshotOffer } +import com.typesafe.config.Config import scala.annotation.varargs import scala.collection.immutable import scala.concurrent.duration._ import scala.reflect.ClassTag +/** + * SnapshotAfter Extension Id and factory for creating SnapshotAfter extension + */ +private[akka] object SnapshotAfter extends ExtensionId[SnapshotAfter] with ExtensionIdProvider { + override def get(system: ActorSystem): SnapshotAfter = super.get(system) + + override def lookup = SnapshotAfter + + override def createExtension(system: ExtendedActorSystem): SnapshotAfter = new SnapshotAfter(system.settings.config) +} + +/** + * SnapshotAfter enables PersistentFSM to take periodical snapshot. + * See `akka.persistence.fsm.snapshot-after` for configuration options. + */ +private[akka] class SnapshotAfter(config: Config) extends Extension { + val key = "akka.persistence.fsm.snapshot-after" + val snapshotAfterValue = config.getString(key).toLowerCase match { + case "off" ⇒ None + case _ ⇒ Some(config.getInt(key)) + } + + /** + * Function that takes lastSequenceNr as the param, and returns whether the passed + * sequence number should trigger auto snapshot or not + */ + val isSnapshotAfterSeqNo: Long ⇒ Boolean = snapshotAfterValue match { + case Some(snapShotAfterValue) ⇒ seqNo: Long ⇒ seqNo % snapShotAfterValue == 0 + case None ⇒ seqNo: Long ⇒ false //always false, if snapshotAfter is not specified in config + } +} + /** * A FSM implementation with persistent state. * @@ -112,20 +145,29 @@ trait PersistentFSM[S <: FSMState, D, E] extends PersistentActor with Persistent var nextData: D = stateData var handlersExecutedCounter = 0 + val snapshotAfterExtension = SnapshotAfter.get(context.system) + var doSnapshot: Boolean = false + def applyStateOnLastHandler() = { handlersExecutedCounter += 1 if (handlersExecutedCounter == eventsToPersist.size) { super.applyState(nextState using nextData) currentStateTimeout = nextState.timeout nextState.afterTransitionDo(stateData) + if (doSnapshot) { + log.info("Saving snapshot, sequence number [{}]", snapshotSequenceNr) + saveStateSnapshot() + } } } persistAll[Any](eventsToPersist) { case domainEventTag(event) ⇒ nextData = applyEvent(event, nextData) + doSnapshot = doSnapshot || snapshotAfterExtension.isSnapshotAfterSeqNo(lastSequenceNr) applyStateOnLastHandler() case StateChangeEvent(stateIdentifier, timeout) ⇒ + doSnapshot = doSnapshot || snapshotAfterExtension.isSnapshotAfterSeqNo(lastSequenceNr) applyStateOnLastHandler() } } diff --git a/akka-persistence/src/test/scala/akka/persistence/fsm/PersistentFSMSpec.scala b/akka-persistence/src/test/scala/akka/persistence/fsm/PersistentFSMSpec.scala index 83cf191a3f..377b2c2b77 100644 --- a/akka-persistence/src/test/scala/akka/persistence/fsm/PersistentFSMSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/fsm/PersistentFSMSpec.scala @@ -4,11 +4,15 @@ package akka.persistence.fsm -import akka.actor._ +import java.io.File + +import akka.actor.{ ActorSystem, _ } import akka.persistence._ import akka.persistence.fsm.PersistentFSM._ +import akka.persistence.fsm.PersistentFSMSpec.IntAdded import akka.testkit._ -import com.typesafe.config.Config +import com.typesafe.config.{ Config, ConfigFactory } +import org.apache.commons.io.FileUtils import scala.concurrent.duration._ import scala.language.postfixOps @@ -343,6 +347,41 @@ abstract class PersistentFSMSpec(config: Config) extends PersistenceSpec(config) } + "save periodical snapshots if akka.persistence.fsm.enable-snapshot-after = on" in { + val sys2 = ActorSystem( + "PersistentFsmSpec2", + ConfigFactory + .parseString(""" + akka.persistence.fsm.enable-snapshot-after = on + akka.persistence.fsm.snapshot-after = 3 + """).withFallback(PersistenceSpec.config("leveldb", "PersistentFSMSpec2"))) + + try { + val probe = TestProbe() + val fsmRef = sys2.actorOf(SnapshotFSM.props(probe.ref)) + + fsmRef ! 1 + fsmRef ! 2 + fsmRef ! 3 + // Needs to wait with expectMsg, before sending the next message to fsmRef. + // Otherwise, stateData sent to this probe is already updated + probe.expectMsg("SeqNo=3, StateData=List(3, 2, 1)") + + fsmRef ! "4x" //changes the state to Persist4xAtOnce, also updates SeqNo although nothing is persisted + fsmRef ! 10 //Persist4xAtOnce = persist 10, 4x times + // snapshot-after = 3, but the SeqNo is not multiple of 3, + // as saveStateSnapshot() is called at the end of persistent event "batch" = 4x of 10's. + probe.expectMsg("SeqNo=8, StateData=List(10, 10, 10, 10, 3, 2, 1)") + + } finally { + val storageLocations = List( + "akka.persistence.journal.leveldb.dir", + "akka.persistence.journal.leveldb-shared.store.dir", + "akka.persistence.snapshot-store.local.dir").map(s ⇒ new File(sys2.settings.config.getString(s))) + shutdown(sys2) + storageLocations.foreach(FileUtils.deleteDirectory) + } + } } } @@ -547,6 +586,47 @@ object PersistentFSMSpec { } } + sealed trait SnapshotFSMState extends PersistentFSM.FSMState + case object PersistSingleAtOnce extends SnapshotFSMState { override def identifier: String = "PersistSingleAtOnce" } + case object Persist4xAtOnce extends SnapshotFSMState { override def identifier: String = "Persist4xAtOnce" } + + sealed trait SnapshotFSMEvent + case class IntAdded(i: Int) extends SnapshotFSMEvent + + object SnapshotFSM { + def props(probe: ActorRef) = Props(new SnapshotFSM(probe)) + } + + class SnapshotFSM(probe: ActorRef)(implicit val domainEventClassTag: ClassTag[SnapshotFSMEvent]) + extends Actor with PersistentFSM[SnapshotFSMState, List[Int], SnapshotFSMEvent] { + + override def persistenceId: String = "snapshot-fsm-test" + + override def applyEvent(event: SnapshotFSMEvent, currentData: List[Int]): List[Int] = event match { + case IntAdded(i) ⇒ i :: currentData + } + + startWith(PersistSingleAtOnce, Nil) + + when(PersistSingleAtOnce) { + case Event(i: Int, _) ⇒ + stay applying IntAdded(i) + case Event("4x", _) ⇒ + goto(Persist4xAtOnce) + case Event(SaveSnapshotSuccess(metadata), _) ⇒ + probe ! s"SeqNo=${metadata.sequenceNr}, StateData=${stateData}" + stay() + } + + when(Persist4xAtOnce) { + case Event(i: Int, _) ⇒ + stay applying (IntAdded(i), IntAdded(i), IntAdded(i), IntAdded(i)) + case Event(SaveSnapshotSuccess(metadata), _) ⇒ + probe ! s"SeqNo=${metadata.sequenceNr}, StateData=${stateData}" + stay() + } + } + } class LeveldbPersistentFSMSpec extends PersistentFSMSpec(PersistenceSpec.config("leveldb", "PersistentFSMSpec"))