Merge pull request #18052 from ktoso/wip-snapshot-store-optional-ktoso
+per #17668 allows not configuring a snapshot-store if it is never used
This commit is contained in:
commit
e23aef93a1
7 changed files with 146 additions and 2 deletions
|
|
@ -485,6 +485,14 @@ If not specified, they default to ``SnapshotSelectionCriteria.latest()`` which s
|
|||
To disable snapshot-based recovery, applications should use ``SnapshotSelectionCriteria.none()``. A recovery where no
|
||||
saved snapshot matches the specified ``SnapshotSelectionCriteria`` will replay all journaled messages.
|
||||
|
||||
.. note::
|
||||
In order to use snapshots a default snapshot-store (``akka.persistence.snapshot-store.plugin``) must be configured,
|
||||
or the persistent actor can pick a snapshot store explicitly by overriding ``String snapshotPluginId()``.
|
||||
|
||||
Since it is acceptable for some applications to not use any snapshotting, it is legal to not configure a snapshot store,
|
||||
however Akka will log a warning message when this situation is detected and then continue to operate until
|
||||
an actor tries to store a snapshot, at which point the the operation will fail (by replying with an ``SaveSnapshotFailure`` for example).
|
||||
|
||||
Snapshot deletion
|
||||
-----------------
|
||||
|
||||
|
|
|
|||
|
|
@ -488,6 +488,14 @@ If not specified, they default to ``SnapshotSelectionCriteria.latest()`` which s
|
|||
To disable snapshot-based recovery, applications should use ``SnapshotSelectionCriteria.none()``. A recovery where no
|
||||
saved snapshot matches the specified ``SnapshotSelectionCriteria`` will replay all journaled messages.
|
||||
|
||||
.. note::
|
||||
In order to use snapshots a default snapshot-store (``akka.persistence.snapshot-store.plugin``) must be configured,
|
||||
or the persistent actor can pick a snapshot store explicitly by overriding ``String snapshotPluginId()``.
|
||||
|
||||
Since it is acceptable for some applications to not use any snapshotting, it is legal to not configure a snapshot store,
|
||||
however Akka will log a warning message when this situation is detected and then continue to operate until
|
||||
an actor tries to store a snapshot, at which point the the operation will fail (by replying with an ``SaveSnapshotFailure`` for example).
|
||||
|
||||
Snapshot deletion
|
||||
-----------------
|
||||
|
||||
|
|
|
|||
|
|
@ -481,6 +481,14 @@ If not specified, they default to ``SnapshotSelectionCriteria.Latest`` which sel
|
|||
To disable snapshot-based recovery, applications should use ``SnapshotSelectionCriteria.None``. A recovery where no
|
||||
saved snapshot matches the specified ``SnapshotSelectionCriteria`` will replay all journaled messages.
|
||||
|
||||
.. note::
|
||||
In order to use snapshots a default snapshot-store (``akka.persistence.snapshot-store.plugin``) must be configured,
|
||||
or the ``PersistentActor`` can pick a snapshot store explicitly by overriding ``def snapshotPluginId: String``.
|
||||
|
||||
Since it is acceptable for some applications to not use any snapshotting, it is legal to not configure a snapshot store,
|
||||
however Akka will log a warning message when this situation is detected and then continue to operate until
|
||||
an actor tries to store a snapshot, at which point the the operation will fail (by replying with an ``SaveSnapshotFailure`` for example).
|
||||
|
||||
Snapshot deletion
|
||||
-----------------
|
||||
|
||||
|
|
|
|||
|
|
@ -32,6 +32,10 @@ akka.persistence {
|
|||
# Persistent actor or view can override `snapshotPluginId` method in order to rely on a different snapshot plugin.
|
||||
plugin = ""
|
||||
}
|
||||
# used as default-snapshot store if no plugin configured (see `akka.persistence.snapshot-store`)
|
||||
no-snapshot-store {
|
||||
class = "akka.persistence.snapshot.NoSnapshotStore"
|
||||
}
|
||||
# Default persistent view settings.
|
||||
view {
|
||||
# Automated incremental view update.
|
||||
|
|
|
|||
|
|
@ -136,6 +136,7 @@ class Persistence(val system: ExtendedActorSystem) extends Extension {
|
|||
private def log: LoggingAdapter = Logging(system, getClass.getName)
|
||||
|
||||
private val DefaultPluginDispatcherId = "akka.persistence.dispatchers.default-plugin-dispatcher"
|
||||
private val NoSnapshotStorePluginId = "akka.persistence.no-snapshot-store"
|
||||
|
||||
private val config = system.settings.config.getConfig("akka.persistence")
|
||||
|
||||
|
|
@ -149,8 +150,13 @@ class Persistence(val system: ExtendedActorSystem) extends Extension {
|
|||
// Lazy, so user is not forced to configure defaults when she is not using them.
|
||||
private lazy val defaultSnapshotPluginId = {
|
||||
val configPath = config.getString("snapshot-store.plugin")
|
||||
require(!isEmpty(configPath), "default snapshot-store plugin is not configured, see 'reference.conf'")
|
||||
configPath
|
||||
|
||||
if (isEmpty(configPath)) {
|
||||
log.warning("No default snapshot store configured! " +
|
||||
"To configure a default snapshot-store plugin set the `akka.persistence.snapshot-store.plugin` key. " +
|
||||
"For details see 'reference.conf'")
|
||||
NoSnapshotStorePluginId
|
||||
} else configPath
|
||||
}
|
||||
|
||||
val settings = new PersistenceSettings(config)
|
||||
|
|
|
|||
|
|
@ -0,0 +1,40 @@
|
|||
/*
|
||||
* Copyright (C) 2009-2015 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
package akka.persistence.snapshot
|
||||
|
||||
import akka.persistence.{ SelectedSnapshot, SnapshotMetadata, SnapshotSelectionCriteria }
|
||||
|
||||
import scala.concurrent.Future
|
||||
|
||||
/**
|
||||
* Used as default snapshot-store in case no other store was configured.
|
||||
*
|
||||
* If a [[akka.persistence.PersistentActor]] calls the [[akka.persistence.PersistentActor#saveSnapshot]] method,
|
||||
* and at the same time does not configure a specific snapshot-store to be used *and* no default snapshot-store
|
||||
* is available, then the `NoSnapshotStore` will be used to signal a snapshot store failure.
|
||||
*/
|
||||
final class NoSnapshotStore extends SnapshotStore {
|
||||
|
||||
final class NoSnapshotStoreException extends RuntimeException("No snapshot store configured!")
|
||||
|
||||
private val flop: Future[Nothing] =
|
||||
Future.failed(new NoSnapshotStoreException)
|
||||
|
||||
private val none: Future[Option[SelectedSnapshot]] =
|
||||
Future.successful(None)
|
||||
|
||||
override def loadAsync(persistenceId: String, criteria: SnapshotSelectionCriteria): Future[Option[SelectedSnapshot]] =
|
||||
none
|
||||
|
||||
override def saveAsync(metadata: SnapshotMetadata, snapshot: Any): Future[Unit] =
|
||||
flop
|
||||
|
||||
override def deleteAsync(metadata: SnapshotMetadata): Future[Unit] =
|
||||
flop
|
||||
|
||||
override def deleteAsync(persistenceId: String, criteria: SnapshotSelectionCriteria): Future[Unit] =
|
||||
flop
|
||||
|
||||
}
|
||||
|
|
@ -0,0 +1,70 @@
|
|||
/*
|
||||
* Copyright (C) 2009-2015 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
package akka.persistence
|
||||
|
||||
import akka.actor.{ Actor, Props }
|
||||
import akka.event.Logging
|
||||
import akka.event.Logging.Warning
|
||||
import akka.testkit.{ EventFilter, ImplicitSender, TestEvent }
|
||||
import com.typesafe.config.ConfigFactory
|
||||
|
||||
import scala.language.postfixOps
|
||||
|
||||
object OptionalSnapshotStoreSpec {
|
||||
|
||||
class AnyPersistentActor(name: String) extends PersistentActor {
|
||||
var lastSender = context.system.deadLetters
|
||||
|
||||
override def persistenceId = name
|
||||
override def receiveCommand: Receive = {
|
||||
case s: String ⇒
|
||||
lastSender = sender()
|
||||
saveSnapshot(s)
|
||||
case f: SaveSnapshotFailure ⇒ lastSender ! f
|
||||
case s: SaveSnapshotSuccess ⇒ lastSender ! s
|
||||
}
|
||||
override def receiveRecover: Receive = Actor.emptyBehavior
|
||||
}
|
||||
|
||||
class PickedSnapshotStorePersistentActor(name: String) extends AnyPersistentActor(name) {
|
||||
override def snapshotPluginId: String = "akka.persistence.snapshot-store.local"
|
||||
}
|
||||
}
|
||||
|
||||
class OptionalSnapshotStoreSpec extends PersistenceSpec(ConfigFactory.parseString(
|
||||
s"""
|
||||
akka.persistence.publish-plugin-commands = on
|
||||
akka.persistence.journal.plugin = "akka.persistence.journal.inmem"
|
||||
akka.persistence.journal.leveldb.dir = "target/journal-${classOf[OptionalSnapshotStoreSpec].getName}"
|
||||
|
||||
# snapshot store plugin is NOT defined, things should still work
|
||||
akka.persistence.snapshot-store.local.dir = "target/snapshots-${classOf[OptionalSnapshotStoreSpec].getName}/"
|
||||
""")) with ImplicitSender {
|
||||
import OptionalSnapshotStoreSpec._
|
||||
|
||||
system.eventStream.publish(TestEvent.Mute(EventFilter[akka.pattern.AskTimeoutException]()))
|
||||
|
||||
"Persistence extension" must {
|
||||
"initialize properly even in absence of configured snapshot store" in {
|
||||
system.actorOf(Props(classOf[AnyPersistentActor], name))
|
||||
system.eventStream.subscribe(testActor, classOf[Logging.Warning])
|
||||
val message = expectMsgType[Warning].message.toString
|
||||
message should include("No default snapshot store configured")
|
||||
}
|
||||
|
||||
"fail if PersistentActor tries to saveSnapshot without snapshot-store available" in {
|
||||
val persistentActor = system.actorOf(Props(classOf[AnyPersistentActor], name))
|
||||
persistentActor ! "snap"
|
||||
expectMsgType[SaveSnapshotFailure].cause.getMessage should include("No snapshot store configured")
|
||||
}
|
||||
|
||||
"successfully save a snapshot when no default snapshot-store configured, yet PersistentActor picked one explicitly" in {
|
||||
val persistentActor = system.actorOf(Props(classOf[PickedSnapshotStorePersistentActor], name))
|
||||
persistentActor ! "snap"
|
||||
expectMsgType[SaveSnapshotSuccess]
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
Loading…
Add table
Add a link
Reference in a new issue