Consolidate duplicate persistence sharding function #26451 (#26452)

This commit is contained in:
Helena Edelson 2019-03-05 08:05:51 -05:00 committed by GitHub
parent 7f0837a550
commit 27957649e8
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 29 additions and 28 deletions

View file

@ -562,21 +562,9 @@ private[akka] class PersistentShard(
}
override def receiveCommand: Receive = ({
case SaveSnapshotSuccess(m)
case e: SaveSnapshotSuccess
log.debug("PersistentShard snapshot saved successfully")
/*
* delete old events but keep the latest around because
*
* it's not safe to delete all events immediately because snapshots are typically stored with a weaker consistency
* level which means that a replay might "see" the deleted events before it sees the stored snapshot,
* i.e. it will use an older snapshot and then not replay the full sequence of events
*
* for debugging if something goes wrong in production it's very useful to be able to inspect the events
*/
val deleteToSequenceNr = m.sequenceNr - keepNrOfBatches * snapshotAfter
if (deleteToSequenceNr > 0) {
deleteMessages(deleteToSequenceNr)
}
internalDeleteMessagesBeforeSnapshot(e, keepNrOfBatches, snapshotAfter)
case SaveSnapshotFailure(_, reason)
log.warning("PersistentShard snapshot failure: [{}]", reason.getMessage)

View file

@ -874,21 +874,9 @@ class PersistentShardCoordinator(typeName: String, settings: ClusterShardingSett
}: Receive).orElse[Any, Unit](receiveTerminated).orElse[Any, Unit](receiveSnapshotResult)
def receiveSnapshotResult: Receive = {
case SaveSnapshotSuccess(m)
case e: SaveSnapshotSuccess
log.debug("Persistent snapshot saved successfully")
/*
* delete old events but keep the latest around because
*
* it's not safe to delete all events immediate because snapshots are typically stored with a weaker consistency
* level which means that a replay might "see" the deleted events before it sees the stored snapshot,
* i.e. it will use an older snapshot and then not replay the full sequence of events
*
* for debugging if something goes wrong in production it's very useful to be able to inspect the events
*/
val deleteToSequenceNr = m.sequenceNr - keepNrOfBatches * snapshotAfter
if (deleteToSequenceNr > 0) {
deleteMessages(deleteToSequenceNr)
}
internalDeleteMessagesBeforeSnapshot(e, keepNrOfBatches, snapshotAfter)
case SaveSnapshotFailure(_, reason)
log.warning("Persistent snapshot failure: {}", reason.getMessage)

View file

@ -0,0 +1,2 @@
# #26451 Consolidate duplicate persistence sharding function
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.persistence.Eventsourced.internalDeleteMessagesBeforeSnapshot")

View file

@ -442,6 +442,29 @@ private[persistence] trait Eventsourced extends Snapshotter with PersistenceStas
s"toSequenceNr [$toSequenceNr] must be less than or equal to lastSequenceNr [$lastSequenceNr]"), toSequenceNr)
}
/**
* INTERNAL API.
* An [[Eventsourced]] actor can request cleanup by deleting either a range of, or all persistent events.
* For example, on successful snapshot completion, delete messages within a configurable `snapshotAfter`
* range that are less than or equal to the given [[SnapshotMetadata.sequenceNr]]
* (provided the [[SnapshotMetadata.sequenceNr]] is <= to [[Eventsourced#lastSequenceNr]]).
*
* Or delete all by using `Long.MaxValue` as the `toSequenceNr`
* {{{ m.copy(sequenceNr = Long.MaxValue) }}}
*/
@InternalApi private[akka] def internalDeleteMessagesBeforeSnapshot(
e: SaveSnapshotSuccess,
keepNrOfBatches: Int,
snapshotAfter: Int): Unit = {
/* Delete old events but keep the latest around
1. It's not safe to delete all events immediately because snapshots are typically stored with
a weaker consistency level. A replay might "see" the deleted events before it sees the stored
snapshot, i.e. it could use an older snapshot and not replay the full sequence of events
2. If there is a production failure, it's useful to be able to inspect the events while debugging */
val sequenceNr = e.metadata.sequenceNr - keepNrOfBatches * snapshotAfter
if (sequenceNr > 0) deleteMessages(sequenceNr)
}
/**
* Returns `true` if this persistent actor is currently recovering.
*/