diff --git a/akka-docs/src/main/paradox/java/persistence.md b/akka-docs/src/main/paradox/java/persistence.md
index 8427e1aefb..34763ef5b8 100644
--- a/akka-docs/src/main/paradox/java/persistence.md
+++ b/akka-docs/src/main/paradox/java/persistence.md
@@ -790,6 +790,26 @@ A snapshot of state data can be persisted by calling the `saveStateSnapshot()` m
On recovery state data is initialized according to the latest available snapshot, then the remaining domain events are replayed, triggering the
`applyEvent` method.
+
+## Periodical snapshot by snapshot-after
+
+You can enable periodical `saveStateSnapshot()` calls in `PersistentFSM` if you turn the following flag on in `reference.conf`
+
+`akka.persistence.fsm.snapshot-after = 1000`
+
+this means `saveStateSnapshot()` is called after the sequence number reaches multiple of 1000.
+
+@@@ note
+
+`saveStateSnapshot()` might not be called exactly at sequence numbers being multiple of the `snapshot-after` configuration value.
+This is because `PersistentFSM` works in a sort of "batch" mode when processing and persisting events, and `saveStateSnapshot()`
+is called only at the end of the "batch". For example, if you set `akka.persistence.fsm.snapshot-after = 1000`,
+it is possible that `saveStateSnapshot()` is called at `lastSequenceNr = 1005, 2003, ... `
+A single batch might persist state transition, also there could be multiple domain events to be persisted
+if you pass them to `applying` method in the `PersistFSM` DSL.
+
+@@@
+
## Storage plugins
Storage backends for journals and snapshot stores are pluggable in the Akka persistence extension.
diff --git a/akka-docs/src/main/paradox/scala/persistence.md b/akka-docs/src/main/paradox/scala/persistence.md
index 472104e349..a9c63bb891 100644
--- a/akka-docs/src/main/paradox/scala/persistence.md
+++ b/akka-docs/src/main/paradox/scala/persistence.md
@@ -798,6 +798,26 @@ A snapshot of state data can be persisted by calling the `saveStateSnapshot()` m
On recovery state data is initialized according to the latest available snapshot, then the remaining domain events are replayed, triggering the
`applyEvent` method.
+
+## Periodical snapshot by snapshot-after
+
+You can enable periodical `saveStateSnapshot()` calls in `PersistentFSM` if you turn the following flag on in `reference.conf`
+
+`akka.persistence.fsm.snapshot-after = 1000`
+
+this means `saveStateSnapshot()` is called after the sequence number reaches multiple of 1000.
+
+@@@ note
+
+`saveStateSnapshot()` might not be called exactly at sequence numbers being multiple of the `snapshot-after` configuration value.
+This is because `PersistentFSM` works in a sort of "batch" mode when processing and persisting events, and `saveStateSnapshot()`
+is called only at the end of the "batch". For example, if you set `akka.persistence.fsm.snapshot-after = 1000`,
+it is possible that `saveStateSnapshot()` is called at `lastSequenceNr = 1005, 2003, ... `
+A single batch might persist state transition, also there could be multiple domain events to be persisted
+if you pass them to `applying` method in the `PersistFSM` DSL.
+
+@@@
+
## Storage plugins
diff --git a/akka-persistence/src/main/resources/reference.conf b/akka-persistence/src/main/resources/reference.conf
index 176329368b..c867fc5034 100644
--- a/akka-persistence/src/main/resources/reference.conf
+++ b/akka-persistence/src/main/resources/reference.conf
@@ -179,6 +179,15 @@ akka.persistence {
reset-timeout = 60s
}
}
+
+ fsm {
+ # PersistentFSM saves snapshots after this number of persistent
+ # events. Snapshots are used to reduce recovery times.
+ # When you disable this feature, specify snapshot-after = off.
+ # To enable the feature, specify a number like snapshot-after = 1000
+ # which means a snapshot is taken after persisting every 1000 events.
+ snapshot-after = off
+ }
}
# Protobuf serialization for the persistent extension messages.
diff --git a/akka-persistence/src/main/scala/akka/persistence/fsm/PersistentFSM.scala b/akka-persistence/src/main/scala/akka/persistence/fsm/PersistentFSM.scala
index 4046687f74..ca9be50ef0 100644
--- a/akka-persistence/src/main/scala/akka/persistence/fsm/PersistentFSM.scala
+++ b/akka-persistence/src/main/scala/akka/persistence/fsm/PersistentFSM.scala
@@ -9,12 +9,45 @@ import akka.annotation.InternalApi
import akka.persistence.fsm.PersistentFSM.FSMState
import akka.persistence.serialization.Message
import akka.persistence.{ PersistentActor, RecoveryCompleted, SnapshotOffer }
+import com.typesafe.config.Config
import scala.annotation.varargs
import scala.collection.immutable
import scala.concurrent.duration._
import scala.reflect.ClassTag
+/**
+ * SnapshotAfter Extension Id and factory for creating SnapshotAfter extension
+ */
+private[akka] object SnapshotAfter extends ExtensionId[SnapshotAfter] with ExtensionIdProvider {
+ override def get(system: ActorSystem): SnapshotAfter = super.get(system)
+
+ override def lookup = SnapshotAfter
+
+ override def createExtension(system: ExtendedActorSystem): SnapshotAfter = new SnapshotAfter(system.settings.config)
+}
+
+/**
+ * SnapshotAfter enables PersistentFSM to take periodical snapshot.
+ * See `akka.persistence.fsm.snapshot-after` for configuration options.
+ */
+private[akka] class SnapshotAfter(config: Config) extends Extension {
+ val key = "akka.persistence.fsm.snapshot-after"
+ val snapshotAfterValue = config.getString(key).toLowerCase match {
+ case "off" ⇒ None
+ case _ ⇒ Some(config.getInt(key))
+ }
+
+ /**
+ * Function that takes lastSequenceNr as the param, and returns whether the passed
+ * sequence number should trigger auto snapshot or not
+ */
+ val isSnapshotAfterSeqNo: Long ⇒ Boolean = snapshotAfterValue match {
+ case Some(snapShotAfterValue) ⇒ seqNo: Long ⇒ seqNo % snapShotAfterValue == 0
+ case None ⇒ seqNo: Long ⇒ false //always false, if snapshotAfter is not specified in config
+ }
+}
+
/**
* A FSM implementation with persistent state.
*
@@ -112,20 +145,29 @@ trait PersistentFSM[S <: FSMState, D, E] extends PersistentActor with Persistent
var nextData: D = stateData
var handlersExecutedCounter = 0
+ val snapshotAfterExtension = SnapshotAfter.get(context.system)
+ var doSnapshot: Boolean = false
+
def applyStateOnLastHandler() = {
handlersExecutedCounter += 1
if (handlersExecutedCounter == eventsToPersist.size) {
super.applyState(nextState using nextData)
currentStateTimeout = nextState.timeout
nextState.afterTransitionDo(stateData)
+ if (doSnapshot) {
+ log.info("Saving snapshot, sequence number [{}]", snapshotSequenceNr)
+ saveStateSnapshot()
+ }
}
}
persistAll[Any](eventsToPersist) {
case domainEventTag(event) ⇒
nextData = applyEvent(event, nextData)
+ doSnapshot = doSnapshot || snapshotAfterExtension.isSnapshotAfterSeqNo(lastSequenceNr)
applyStateOnLastHandler()
case StateChangeEvent(stateIdentifier, timeout) ⇒
+ doSnapshot = doSnapshot || snapshotAfterExtension.isSnapshotAfterSeqNo(lastSequenceNr)
applyStateOnLastHandler()
}
}
diff --git a/akka-persistence/src/test/scala/akka/persistence/fsm/PersistentFSMSpec.scala b/akka-persistence/src/test/scala/akka/persistence/fsm/PersistentFSMSpec.scala
index 83cf191a3f..377b2c2b77 100644
--- a/akka-persistence/src/test/scala/akka/persistence/fsm/PersistentFSMSpec.scala
+++ b/akka-persistence/src/test/scala/akka/persistence/fsm/PersistentFSMSpec.scala
@@ -4,11 +4,15 @@
package akka.persistence.fsm
-import akka.actor._
+import java.io.File
+
+import akka.actor.{ ActorSystem, _ }
import akka.persistence._
import akka.persistence.fsm.PersistentFSM._
+import akka.persistence.fsm.PersistentFSMSpec.IntAdded
import akka.testkit._
-import com.typesafe.config.Config
+import com.typesafe.config.{ Config, ConfigFactory }
+import org.apache.commons.io.FileUtils
import scala.concurrent.duration._
import scala.language.postfixOps
@@ -343,6 +347,41 @@ abstract class PersistentFSMSpec(config: Config) extends PersistenceSpec(config)
}
+ "save periodical snapshots if akka.persistence.fsm.enable-snapshot-after = on" in {
+ val sys2 = ActorSystem(
+ "PersistentFsmSpec2",
+ ConfigFactory
+ .parseString("""
+ akka.persistence.fsm.enable-snapshot-after = on
+ akka.persistence.fsm.snapshot-after = 3
+ """).withFallback(PersistenceSpec.config("leveldb", "PersistentFSMSpec2")))
+
+ try {
+ val probe = TestProbe()
+ val fsmRef = sys2.actorOf(SnapshotFSM.props(probe.ref))
+
+ fsmRef ! 1
+ fsmRef ! 2
+ fsmRef ! 3
+ // Needs to wait with expectMsg, before sending the next message to fsmRef.
+ // Otherwise, stateData sent to this probe is already updated
+ probe.expectMsg("SeqNo=3, StateData=List(3, 2, 1)")
+
+ fsmRef ! "4x" //changes the state to Persist4xAtOnce, also updates SeqNo although nothing is persisted
+ fsmRef ! 10 //Persist4xAtOnce = persist 10, 4x times
+ // snapshot-after = 3, but the SeqNo is not multiple of 3,
+ // as saveStateSnapshot() is called at the end of persistent event "batch" = 4x of 10's.
+ probe.expectMsg("SeqNo=8, StateData=List(10, 10, 10, 10, 3, 2, 1)")
+
+ } finally {
+ val storageLocations = List(
+ "akka.persistence.journal.leveldb.dir",
+ "akka.persistence.journal.leveldb-shared.store.dir",
+ "akka.persistence.snapshot-store.local.dir").map(s ⇒ new File(sys2.settings.config.getString(s)))
+ shutdown(sys2)
+ storageLocations.foreach(FileUtils.deleteDirectory)
+ }
+ }
}
}
@@ -547,6 +586,47 @@ object PersistentFSMSpec {
}
}
+ sealed trait SnapshotFSMState extends PersistentFSM.FSMState
+ case object PersistSingleAtOnce extends SnapshotFSMState { override def identifier: String = "PersistSingleAtOnce" }
+ case object Persist4xAtOnce extends SnapshotFSMState { override def identifier: String = "Persist4xAtOnce" }
+
+ sealed trait SnapshotFSMEvent
+ case class IntAdded(i: Int) extends SnapshotFSMEvent
+
+ object SnapshotFSM {
+ def props(probe: ActorRef) = Props(new SnapshotFSM(probe))
+ }
+
+ class SnapshotFSM(probe: ActorRef)(implicit val domainEventClassTag: ClassTag[SnapshotFSMEvent])
+ extends Actor with PersistentFSM[SnapshotFSMState, List[Int], SnapshotFSMEvent] {
+
+ override def persistenceId: String = "snapshot-fsm-test"
+
+ override def applyEvent(event: SnapshotFSMEvent, currentData: List[Int]): List[Int] = event match {
+ case IntAdded(i) ⇒ i :: currentData
+ }
+
+ startWith(PersistSingleAtOnce, Nil)
+
+ when(PersistSingleAtOnce) {
+ case Event(i: Int, _) ⇒
+ stay applying IntAdded(i)
+ case Event("4x", _) ⇒
+ goto(Persist4xAtOnce)
+ case Event(SaveSnapshotSuccess(metadata), _) ⇒
+ probe ! s"SeqNo=${metadata.sequenceNr}, StateData=${stateData}"
+ stay()
+ }
+
+ when(Persist4xAtOnce) {
+ case Event(i: Int, _) ⇒
+ stay applying (IntAdded(i), IntAdded(i), IntAdded(i), IntAdded(i))
+ case Event(SaveSnapshotSuccess(metadata), _) ⇒
+ probe ! s"SeqNo=${metadata.sequenceNr}, StateData=${stateData}"
+ stay()
+ }
+ }
+
}
class LeveldbPersistentFSMSpec extends PersistentFSMSpec(PersistenceSpec.config("leveldb", "PersistentFSMSpec"))