diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/EventSourcedSignal.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/EventSourcedSignal.scala index 317910645a..2ab93c45a3 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/EventSourcedSignal.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/EventSourcedSignal.scala @@ -52,14 +52,14 @@ final case class SnapshotFailed(metadata: SnapshotMetadata, failure: Throwable) def getSnapshotMetadata(): SnapshotMetadata = metadata } -final case class DeleteSnapshotCompleted(target: DeletionTarget) extends EventSourcedSignal { +final case class DeleteSnapshotsCompleted(target: DeletionTarget) extends EventSourcedSignal { /** * Java API */ def getTarget(): DeletionTarget = target } -final case class DeleteSnapshotFailed(target: DeletionTarget, failure: Throwable) extends EventSourcedSignal { +final case class DeleteSnapshotsFailed(target: DeletionTarget, failure: Throwable) extends EventSourcedSignal { /** * Java API @@ -72,6 +72,26 @@ final case class DeleteSnapshotFailed(target: DeletionTarget, failure: Throwable def getTarget(): DeletionTarget = target } +final case class DeleteMessagesCompleted(toSequenceNr: Long) extends EventSourcedSignal { + + /** + * Java API + */ + def getToSequenceNr(): Long = toSequenceNr +} +final case class DeleteMessagesFailed(toSequenceNr: Long, failure: Throwable) extends EventSourcedSignal { + + /** + * Java API + */ + def getFailure(): Throwable = failure + + /** + * Java API + */ + def getToSequenceNr(): Long = toSequenceNr +} + /** * Not for user extension */ diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/Retention.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/Retention.scala new file mode 100644 index 0000000000..ddc9fbcd2b --- /dev/null +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/Retention.scala @@ -0,0 +1,55 @@ +/* + * Copyright (C) 2019 Lightbend Inc. + */ + +package akka.persistence.typed + +/** + * Setup snapshot and event delete/retention behavior. Retention bridges snapshot + * and journal behavior. This defines the retention criteria. + * + * @param snapshotEveryNEvents Snapshots are used to reduce playback/recovery times. + * This defines when a new snapshot is persisted. + * + * @param keepNSnapshots After a snapshot is successfully completed, + * - if 2: retain last maximum 2 *`snapshot-size` events + * and 3 snapshots (2 old + latest snapshot) + * - if 0: all events with equal or lower sequence number + * will not be retained. + * + * @param deleteEventsOnSnapshot Opt-in ability to delete older events on successful + * save of snapshot. Defaults to disabled. + */ +final case class RetentionCriteria(snapshotEveryNEvents: Long, keepNSnapshots: Long, deleteEventsOnSnapshot: Boolean) { + + /** + * Delete Messages: + * {{{ toSequenceNr - keepNSnapshots * snapshotEveryNEvents }}} + * Delete Snapshots: + * {{{ (toSequenceNr - 1) - (keepNSnapshots * snapshotEveryNEvents) }}} + * + * @param lastSequenceNr the sequence number to delete to if `deleteEventsOnSnapshot` is false + */ + def toSequenceNumber(lastSequenceNr: Long): Long = { + // Delete old events, retain the latest + lastSequenceNr - (keepNSnapshots * snapshotEveryNEvents) + } +} + +object RetentionCriteria { + + def apply(): RetentionCriteria = + RetentionCriteria(snapshotEveryNEvents = 1000L, keepNSnapshots = 2L, deleteEventsOnSnapshot = false) + + /** Scala API. */ + def apply(snapshotEveryNEvents: Long, keepNSnapshots: Long): RetentionCriteria = + RetentionCriteria(snapshotEveryNEvents, keepNSnapshots, deleteEventsOnSnapshot = false) + + /** Java API. */ + def create(snapshotEveryNEvents: Long, keepNSnapshots: Long): RetentionCriteria = + apply(snapshotEveryNEvents, keepNSnapshots) + + /** Java API. */ + def create(snapshotEveryNEvents: Long, keepNSnapshots: Long, deleteMessagesOnSnapshot: Boolean): RetentionCriteria = + apply(snapshotEveryNEvents, keepNSnapshots, deleteMessagesOnSnapshot) +} diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/BehaviorSetup.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/BehaviorSetup.scala index a2334d6e80..9d2f775582 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/BehaviorSetup.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/BehaviorSetup.scala @@ -15,6 +15,7 @@ import akka.annotation.InternalApi import akka.persistence._ import akka.persistence.typed.EventAdapter import akka.persistence.typed.PersistenceId +import akka.persistence.typed.RetentionCriteria import akka.persistence.typed.scaladsl.EventSourcedBehavior import akka.util.ConstantFun import akka.util.OptionVal @@ -35,9 +36,11 @@ private[akka] final class BehaviorSetup[C, E, S]( val eventAdapter: EventAdapter[E, _], val snapshotWhen: (S, E, Long) ⇒ Boolean, val recovery: Recovery, + val retention: RetentionCriteria, var holdingRecoveryPermit: Boolean, val settings: EventSourcedSettings, val stashState: StashState) { + import InternalProtocol.RecoveryTickEvent import akka.actor.typed.scaladsl.adapter._ diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventSourcedBehaviorImpl.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventSourcedBehaviorImpl.scala index af2d5196e9..72e9cd312a 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventSourcedBehaviorImpl.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventSourcedBehaviorImpl.scala @@ -7,11 +7,8 @@ package akka.persistence.typed.internal import java.util.UUID import java.util.concurrent.atomic.AtomicInteger -import scala.util.Failure -import scala.util.Success -import scala.util.Try import scala.util.control.NonFatal -import akka.Done + import akka.actor.typed import akka.actor.typed.BackoffSupervisorStrategy import akka.actor.typed.Behavior @@ -23,9 +20,14 @@ import akka.actor.typed.scaladsl.ActorContext import akka.actor.typed.scaladsl.Behaviors import akka.annotation.InternalApi import akka.persistence._ +import akka.persistence.typed.DeleteMessagesFailed +import akka.persistence.typed.DeleteSnapshotsCompleted +import akka.persistence.typed.DeleteSnapshotsFailed +import akka.persistence.typed.DeletionTarget import akka.persistence.typed.EventAdapter import akka.persistence.typed.NoOpEventAdapter import akka.persistence.typed.PersistenceId +import akka.persistence.typed.RetentionCriteria import akka.persistence.typed.SnapshotCompleted import akka.persistence.typed.SnapshotFailed import akka.persistence.typed.scaladsl._ @@ -62,6 +64,7 @@ private[akka] final case class EventSourcedBehaviorImpl[Command, Event, State]( eventAdapter: EventAdapter[Event, Any] = NoOpEventAdapter.instance[Event], snapshotWhen: (State, Event, Long) ⇒ Boolean = ConstantFun.scalaAnyThreeToFalse, recovery: Recovery = Recovery(), + retention: RetentionCriteria = RetentionCriteria(), supervisionStrategy: SupervisorStrategy = SupervisorStrategy.stop, override val signalHandler: PartialFunction[Signal, Unit] = PartialFunction.empty) extends EventSourcedBehavior[Command, Event, State] { @@ -79,9 +82,19 @@ private[akka] final case class EventSourcedBehaviorImpl[Command, Event, State]( val actualSignalHandler: PartialFunction[Signal, Unit] = signalHandler.orElse { // default signal handler is always the fallback case SnapshotCompleted(meta: SnapshotMetadata) ⇒ - ctx.log.debug("Save snapshot successful, snapshot metadata: [{}]", meta) + ctx.log.debug("Save snapshot successful, snapshot metadata [{}]", meta) case SnapshotFailed(meta, failure) ⇒ - ctx.log.error(failure, "Save snapshot failed, snapshot metadata: [{}]", meta) + ctx.log.error(failure, "Save snapshot failed, snapshot metadata [{}]", meta) + case DeleteSnapshotsCompleted(DeletionTarget.Individual(meta)) => + ctx.log.debug(s"Persistent snapshot [{}] deleted successfully.", meta) + case DeleteSnapshotsCompleted(DeletionTarget.Criteria(criteria)) => + ctx.log.debug(s"Persistent snapshots given criteria [{}] deleted successfully.", criteria) + case DeleteSnapshotsFailed(DeletionTarget.Individual(meta), failure) => + ctx.log.warning("Failed to delete snapshot with meta [{}] due to [{}].", meta, failure) + case DeleteSnapshotsFailed(DeletionTarget.Criteria(criteria), failure) => + ctx.log.warning("Failed to delete snapshots given criteria [{}] due to [{}].", criteria, failure) + case DeleteMessagesFailed(toSequenceNr, failure) => + ctx.log.warning("Failed to delete messages toSequenceNr [{}] due to [{}].", toSequenceNr, failure) } Behaviors @@ -99,6 +112,7 @@ private[akka] final case class EventSourcedBehaviorImpl[Command, Event, State]( eventAdapter, snapshotWhen, recovery, + retention, holdingRecoveryPermit = false, settings = settings, stashState = stashState) @@ -171,6 +185,9 @@ private[akka] final case class EventSourcedBehaviorImpl[Command, Event, State]( copy(recovery = Recovery(selection)) } + override def withRetention(criteria: RetentionCriteria): EventSourcedBehavior[Command, Event, State] = + copy(retention = criteria) + override def withTagger(tagger: Event => Set[String]): EventSourcedBehavior[Command, Event, State] = copy(tagger = tagger) diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/JournalInteractions.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/JournalInteractions.scala index e1158c29c8..4d810e5134 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/JournalInteractions.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/JournalInteractions.scala @@ -106,6 +106,29 @@ private[akka] trait JournalInteractions[C, E, S] { } // else, no need to return the permit } + /** + * On [[akka.persistence.SaveSnapshotSuccess]], if [[akka.persistence.typed.RetentionCriteria.deleteEventsOnSnapshot]] + * is enabled, old messages are deleted based on [[akka.persistence.typed.RetentionCriteria.snapshotEveryNEvents]] + * before old snapshots are deleted. + */ + protected def internalDeleteEvents(e: SaveSnapshotSuccess, state: Running.RunningState[S]): Unit = + if (setup.retention.deleteEventsOnSnapshot) { + val toSequenceNr = setup.retention.toSequenceNumber(e.metadata.sequenceNr) + + if (toSequenceNr > 0) { + val lastSequenceNr = state.seqNr + val self = setup.selfUntyped + + if (toSequenceNr == Long.MaxValue || toSequenceNr <= lastSequenceNr) + setup.journal ! JournalProtocol.DeleteMessagesTo(e.metadata.persistenceId, toSequenceNr, self) + else + self ! DeleteMessagesFailure( + new RuntimeException( + s"toSequenceNr [$toSequenceNr] must be less than or equal to lastSequenceNr [$lastSequenceNr]"), + toSequenceNr) + } + } + // ---------- snapshot store interactions --------- /** @@ -125,4 +148,16 @@ private[akka] trait JournalInteractions[C, E, S] { setup.selfUntyped) } + /** Deletes the snapshot identified by `sequenceNr`. */ + protected def internalDeleteSnapshots(toSequenceNr: Long): Unit = { + setup.log.debug("Deleting snapshot to [{}]", toSequenceNr) + + val deleteTo = toSequenceNr - 1 + val deleteFrom = math.max(0, setup.retention.toSequenceNumber(deleteTo)) + val snapshotCriteria = SnapshotSelectionCriteria(minSequenceNr = deleteFrom, maxSequenceNr = deleteTo) + + setup.log.debug("Deleting snapshots from [{}] to [{}]", deleteFrom, deleteTo) + setup.snapshotStore + .tell(SnapshotProtocol.DeleteSnapshots(setup.persistenceId.id, snapshotCriteria), setup.selfUntyped) + } } diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/Running.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/Running.scala index c76069c723..d26c9ee9c7 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/Running.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/Running.scala @@ -6,6 +6,7 @@ package akka.persistence.typed.internal import scala.annotation.tailrec import scala.collection.immutable + import akka.actor.UnhandledMessage import akka.actor.typed.Behavior import akka.actor.typed.Signal @@ -17,8 +18,10 @@ import akka.persistence.JournalProtocol._ import akka.persistence._ import akka.persistence.journal.Tagged import akka.persistence.typed.Callback -import akka.persistence.typed.DeleteSnapshotCompleted -import akka.persistence.typed.DeleteSnapshotFailed +import akka.persistence.typed.DeleteSnapshotsCompleted +import akka.persistence.typed.DeleteSnapshotsFailed +import akka.persistence.typed.DeleteMessagesCompleted +import akka.persistence.typed.DeleteMessagesFailed import akka.persistence.typed.DeletionTarget import akka.persistence.typed.EventRejectedException import akka.persistence.typed.SideEffect @@ -93,10 +96,9 @@ private[akka] object Running { def onMessage(msg: InternalProtocol): Behavior[InternalProtocol] = msg match { case IncomingCommand(c: C @unchecked) => onCommand(state, c) - case SnapshotterResponse(r) => - setup.log.warning("Unexpected SnapshotterResponse {}", r) - Behaviors.unhandled - case _ => Behaviors.unhandled + case SnapshotterResponse(r) => onSnapshotterResponse(r) + case JournalResponse(r) => onJournalResponse(r) + case _ => Behaviors.unhandled } override def onSignal: PartialFunction[Signal, Behavior[InternalProtocol]] = { @@ -110,6 +112,53 @@ private[akka] object Running { applyEffects(cmd, state, effect.asInstanceOf[EffectImpl[E, S]]) // TODO can we avoid the cast? } + /** Handle journal responses for non-persist events workloads. */ + def onJournalResponse(response: JournalProtocol.Response): Behavior[InternalProtocol] = { + val signal = response match { + case DeleteMessagesSuccess(toSequenceNr) => + setup.log.debug("Persistent messages to [{}] deleted successfully.", toSequenceNr) + internalDeleteSnapshots(toSequenceNr) + Some(DeleteMessagesCompleted(toSequenceNr)) + + case DeleteMessagesFailure(e, toSequenceNr) => + Some(DeleteMessagesFailed(toSequenceNr, e)) + case _ => + None + } + + signal match { + case Some(sig) => + setup.onSignal(sig) + this + case None => + Behaviors.unhandled // unexpected journal response + } + } + + /** Handle snapshot responses for non-persist events workloads. */ + def onSnapshotterResponse(response: SnapshotProtocol.Response): Behavior[InternalProtocol] = { + val signal = response match { + case DeleteSnapshotsSuccess(criteria) => + Some(DeleteSnapshotsCompleted(DeletionTarget.Criteria(criteria))) + case DeleteSnapshotsFailure(criteria, error) => + Some(DeleteSnapshotsFailed(DeletionTarget.Criteria(criteria), error)) + case DeleteSnapshotSuccess(meta) => + Some(DeleteSnapshotsCompleted(DeletionTarget.Individual(meta))) + case DeleteSnapshotFailure(meta, error) => + Some(DeleteSnapshotsFailed(DeletionTarget.Individual(meta), error)) + case _ => + None + } + + signal match { + case Some(sig) => + setup.onSignal(sig) + this + case None => + Behaviors.unhandled // unexpected snapshot response + } + } + @tailrec def applyEffects( msg: Any, state: RunningState[S], @@ -315,21 +364,25 @@ private[akka] object Running { def onSnapshotterResponse(response: SnapshotProtocol.Response): Unit = { val signal = response match { - case SaveSnapshotSuccess(meta) => + case e @ SaveSnapshotSuccess(meta) => + // # 24698 The deletion of old events are automatic, snapshots are triggered by the SaveSnapshotSuccess. + setup.log.debug(s"Persistent snapshot [{}] saved successfully", meta) + if (setup.retention.deleteEventsOnSnapshot) + internalDeleteEvents(e, state) // if successful, DeleteMessagesSuccess then internalDeleteSnapshots + else + internalDeleteSnapshots(meta.sequenceNr) + Some(SnapshotCompleted(meta)) - case SaveSnapshotFailure(meta, ex) => - Some(SnapshotFailed(meta, ex)) - case DeleteSnapshotSuccess(meta) => - Some(DeleteSnapshotCompleted(DeletionTarget.Individual(meta))) - case DeleteSnapshotFailure(meta, ex) => - Some(DeleteSnapshotFailed(DeletionTarget.Individual(meta), ex)) - case DeleteSnapshotsSuccess(criteria) => - Some(DeleteSnapshotCompleted(DeletionTarget.Criteria(criteria))) - case DeleteSnapshotsFailure(criteria, failure) => - Some(DeleteSnapshotFailed(DeletionTarget.Criteria(criteria), failure)) - case _ => None + + case SaveSnapshotFailure(meta, error) => + setup.log.warning("Failed to save snapshot given metadata [{}] due to [{}]", meta, error.getMessage) + Some(SnapshotFailed(meta, error)) + + case _ => + None } + setup.log.debug("Received snapshot event [{}], returning signal [{}].", response, signal) signal.foreach(setup.onSignal _) } diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/EventSourcedBehavior.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/EventSourcedBehavior.scala index 12855b7e99..c8f4f46d72 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/EventSourcedBehavior.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/EventSourcedBehavior.scala @@ -128,6 +128,8 @@ abstract class EventSourcedBehavior[Command, Event, State >: Null] private[akka] def eventAdapter(): EventAdapter[Event, _] = NoOpEventAdapter.instance[Event] + def retentionCriteria: RetentionCriteria = RetentionCriteria() + /** * INTERNAL API: DeferredBehavior init */ diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/scaladsl/EventSourcedBehavior.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/scaladsl/EventSourcedBehavior.scala index aabc48964b..044dd3983f 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/scaladsl/EventSourcedBehavior.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/scaladsl/EventSourcedBehavior.scala @@ -4,9 +4,8 @@ package akka.persistence.typed.scaladsl -import scala.util.Try import scala.annotation.tailrec -import akka.Done + import akka.actor.typed.BackoffSupervisorStrategy import akka.actor.typed.Behavior import akka.actor.typed.Behavior.DeferredBehavior @@ -20,6 +19,7 @@ import akka.persistence._ import akka.persistence.typed.EventAdapter import akka.persistence.typed.ExpectingReply import akka.persistence.typed.PersistenceId +import akka.persistence.typed.RetentionCriteria import akka.persistence.typed.internal._ object EventSourcedBehavior { @@ -176,6 +176,11 @@ object EventSourcedBehavior { */ def withSnapshotSelectionCriteria(selection: SnapshotSelectionCriteria): EventSourcedBehavior[Command, Event, State] + /** + * Criteria for internal retention/deletion of snapshots and events. + */ + def withRetention(criteria: RetentionCriteria): EventSourcedBehavior[Command, Event, State] + /** * The `tagger` function should give event tags, which will be used in persistence query */ diff --git a/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/BasicPersistentBehaviorTest.java b/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/BasicPersistentBehaviorTest.java index fbb354faca..48cbe9fa88 100644 --- a/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/BasicPersistentBehaviorTest.java +++ b/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/BasicPersistentBehaviorTest.java @@ -10,6 +10,7 @@ import akka.actor.typed.javadsl.ActorContext; import akka.actor.typed.javadsl.Behaviors; import akka.persistence.typed.PersistenceId; import akka.persistence.typed.RecoveryCompleted; +import akka.persistence.typed.RetentionCriteria; import akka.persistence.typed.javadsl.CommandHandler; import akka.persistence.typed.javadsl.EventHandler; import akka.persistence.typed.javadsl.EventSourcedBehavior; @@ -278,6 +279,12 @@ public class BasicPersistentBehaviorTest { } // #snapshottingPredicate + // #retentionCriteria + @Override // override snapshotEvery in EventSourcedBehavior + public RetentionCriteria retentionCriteria() { + return RetentionCriteria.create(1000, 2); + } + // #retentionCriteria } } diff --git a/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/EventSourcedBehaviorSpec.scala b/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/EventSourcedBehaviorSpec.scala index 172177a423..39574377b1 100644 --- a/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/EventSourcedBehaviorSpec.scala +++ b/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/EventSourcedBehaviorSpec.scala @@ -10,8 +10,10 @@ import java.util.concurrent.atomic.AtomicInteger import scala.concurrent.Future import scala.concurrent.Promise import scala.concurrent.duration._ +import scala.util.Failure import scala.util.Success import scala.util.Try + import akka.Done import akka.testkit.EventFilter import akka.actor.testkit.typed.{ TestException, TestKitSettings } @@ -23,29 +25,32 @@ import akka.actor.typed.SupervisorStrategy import akka.actor.typed.Terminated import akka.actor.typed.scaladsl.ActorContext import akka.actor.typed.scaladsl.Behaviors -import akka.persistence.SelectedSnapshot import akka.persistence.SnapshotMetadata import akka.persistence.SnapshotSelectionCriteria +import akka.persistence.SelectedSnapshot import akka.persistence.journal.inmem.InmemJournal import akka.persistence.query.EventEnvelope import akka.persistence.query.PersistenceQuery import akka.persistence.query.Sequence import akka.persistence.query.journal.leveldb.scaladsl.LeveldbReadJournal import akka.persistence.snapshot.SnapshotStore +import akka.persistence.typed.DeleteSnapshotsCompleted +import akka.persistence.typed.DeleteMessagesCompleted import akka.persistence.typed.EventAdapter import akka.persistence.typed.ExpectingReply import akka.persistence.typed.PersistenceId import akka.persistence.typed.RecoveryCompleted import akka.persistence.typed.SnapshotCompleted import akka.persistence.typed.SnapshotFailed +import akka.persistence.typed.DeletionTarget.Criteria +import akka.persistence.typed.EventSourcedSignal +import akka.persistence.typed.RetentionCriteria import akka.stream.ActorMaterializer import akka.stream.scaladsl.Sink import com.typesafe.config.Config import com.typesafe.config.ConfigFactory import org.scalatest.WordSpecLike -import scala.util.Failure - object EventSourcedBehaviorSpec { //#event-wrapper @@ -69,8 +74,16 @@ object EventSourcedBehaviorSpec { Future.successful(()) } - def deleteAsync(metadata: SnapshotMetadata) = ??? - def deleteAsync(persistenceId: String, criteria: SnapshotSelectionCriteria) = ??? + override def deleteAsync(metadata: SnapshotMetadata): Future[Unit] = { + state = state.filterNot { case (k, (_, b)) => k == metadata.persistenceId && b.sequenceNr == metadata.sequenceNr } + Future.successful(()) + } + + override def deleteAsync(persistenceId: String, criteria: SnapshotSelectionCriteria): Future[Unit] = { + val range = criteria.minSequenceNr to criteria.maxSequenceNr + state = state.filterNot { case (k, (_, b)) => k == persistenceId && range.contains(b.sequenceNr) } + Future.successful(()) + } } // also used from PersistentActorTest @@ -134,11 +147,18 @@ object EventSourcedBehaviorSpec { persistenceId, loggingActor = TestProbe[String].ref, probe = TestProbe[(State, Event)].ref, - TestProbe[Try[Done]].ref) + snapshotProbe = TestProbe[Try[Done]].ref, + retentionProbe = TestProbe[Try[EventSourcedSignal]].ref) def counter(ctx: ActorContext[Command], persistenceId: PersistenceId, logging: ActorRef[String])( implicit system: ActorSystem[_]): EventSourcedBehavior[Command, Event, State] = - counter(ctx, persistenceId, loggingActor = logging, probe = TestProbe[(State, Event)].ref, TestProbe[Try[Done]].ref) + counter( + ctx, + persistenceId, + loggingActor = logging, + probe = TestProbe[(State, Event)].ref, + TestProbe[Try[Done]].ref, + TestProbe[Try[EventSourcedSignal]].ref) def counterWithProbe( ctx: ActorContext[Command], @@ -146,22 +166,49 @@ object EventSourcedBehaviorSpec { probe: ActorRef[(State, Event)], snapshotProbe: ActorRef[Try[Done]])( implicit system: ActorSystem[_]): EventSourcedBehavior[Command, Event, State] = - counter(ctx, persistenceId, TestProbe[String].ref, probe, snapshotProbe) + counter(ctx, persistenceId, TestProbe[String].ref, probe, snapshotProbe, TestProbe[Try[EventSourcedSignal]].ref) def counterWithProbe(ctx: ActorContext[Command], persistenceId: PersistenceId, probe: ActorRef[(State, Event)])( implicit system: ActorSystem[_]): EventSourcedBehavior[Command, Event, State] = - counter(ctx, persistenceId, TestProbe[String].ref, probe, TestProbe[Try[Done]].ref) + counter( + ctx, + persistenceId, + TestProbe[String].ref, + probe, + TestProbe[Try[Done]].ref, + TestProbe[Try[EventSourcedSignal]].ref) def counterWithSnapshotProbe(ctx: ActorContext[Command], persistenceId: PersistenceId, probe: ActorRef[Try[Done]])( implicit system: ActorSystem[_]): EventSourcedBehavior[Command, Event, State] = - counter(ctx, persistenceId, TestProbe[String].ref, TestProbe[(State, Event)].ref, snapshotProbe = probe) + counter( + ctx, + persistenceId, + TestProbe[String].ref, + TestProbe[(State, Event)].ref, + snapshotProbe = probe, + TestProbe[Try[EventSourcedSignal]].ref) + + def counterWithSnapshotAndRetentionProbe( + ctx: ActorContext[Command], + persistenceId: PersistenceId, + probeS: ActorRef[Try[Done]], + probeR: ActorRef[Try[EventSourcedSignal]])( + implicit system: ActorSystem[_]): EventSourcedBehavior[Command, Event, State] = + counter( + ctx, + persistenceId, + TestProbe[String].ref, + TestProbe[(State, Event)].ref, + snapshotProbe = probeS, + retentionProbe = probeR) def counter( ctx: ActorContext[Command], persistenceId: PersistenceId, loggingActor: ActorRef[String], probe: ActorRef[(State, Event)], - snapshotProbe: ActorRef[Try[Done]]): EventSourcedBehavior[Command, Event, State] = { + snapshotProbe: ActorRef[Try[Done]], + retentionProbe: ActorRef[Try[EventSourcedSignal]]): EventSourcedBehavior[Command, Event, State] = { EventSourcedBehavior[Command, Event, State]( persistenceId, emptyState = State(0, Vector.empty), @@ -267,9 +314,10 @@ object EventSourcedBehaviorSpec { snapshotProbe ! Success(Done) case SnapshotFailed(_, failure) ⇒ snapshotProbe ! Failure(failure) + case e: EventSourcedSignal => + retentionProbe ! Success(e) } } - } class EventSourcedBehaviorSpec extends ScalaTestWithActorTestKit(EventSourcedBehaviorSpec.conf) with WordSpecLike { @@ -701,7 +749,88 @@ class EventSourcedBehaviorSpec extends ScalaTestWithActorTestKit(EventSourcedBeh c2 ! Fail probe.expectTerminated(c2) // should fail } + } + "delete snapshots automatically, based on criteria" in { + val unexpected = (signal: EventSourcedSignal) => fail(s"Unexpected signal [$signal].") + + val pid = nextPid + val snapshotProbe = TestProbe[Try[Done]]() + val retentionProbe = TestProbe[Try[EventSourcedSignal]]() + val replyProbe = TestProbe[State]() + + val persistentActor = spawn( + Behaviors.setup[Command]( + ctx ⇒ + counterWithSnapshotAndRetentionProbe(ctx, pid, snapshotProbe.ref, retentionProbe.ref) + .snapshotEvery(2) + .withRetention( + RetentionCriteria(snapshotEveryNEvents = 2, keepNSnapshots = 2, deleteEventsOnSnapshot = false)))) + + persistentActor ! IncrementWithPersistAll(3) + persistentActor ! GetValue(replyProbe.ref) + replyProbe.expectMessage(State(3, Vector(0, 1, 2))) + snapshotProbe.expectMessage(Try(Done)) + retentionProbe.expectMessageType[Success[DeleteSnapshotsCompleted]].value match { + case DeleteSnapshotsCompleted(Criteria(SnapshotSelectionCriteria(maxSequenceNr, _, minSequenceNr, _))) => + maxSequenceNr shouldEqual 2 + minSequenceNr shouldEqual 0 + case signal => unexpected(signal) + } + + persistentActor ! IncrementWithPersistAll(3) + snapshotProbe.expectMessage(Try(Done)) + retentionProbe.expectMessageType[Success[DeleteSnapshotsCompleted]].value match { + case DeleteSnapshotsCompleted(Criteria(SnapshotSelectionCriteria(maxSequenceNr, _, minSequenceNr, _))) => + maxSequenceNr shouldEqual 5 + minSequenceNr shouldEqual 1 + case signal => unexpected(signal) + } + + persistentActor ! GetValue(replyProbe.ref) + replyProbe.expectMessage(State(6, Vector(0, 1, 2, 3, 4, 5))) + } + + "optionally delete both old messages and snapshots" in { + val pid = nextPid + val snapshotProbe = TestProbe[Try[Done]]() + val retentionProbe = TestProbe[Try[EventSourcedSignal]]() + val replyProbe = TestProbe[State]() + + val persistentActor = spawn( + Behaviors.setup[Command]( + ctx ⇒ + counterWithSnapshotAndRetentionProbe(ctx, pid, snapshotProbe.ref, retentionProbe.ref) + .snapshotEvery(2) + .withRetention( + RetentionCriteria(snapshotEveryNEvents = 2, keepNSnapshots = 2, deleteEventsOnSnapshot = true)))) + + persistentActor ! IncrementWithPersistAll(10) + persistentActor ! GetValue(replyProbe.ref) + val initialState = replyProbe.expectMessageType[State] + initialState.value shouldEqual 10 + initialState.history shouldEqual (0 until initialState.value).toVector + snapshotProbe.expectMessage(Try(Done)) + + val firstDeleteMessages = retentionProbe.expectMessageType[Success[DeleteMessagesCompleted]].value + firstDeleteMessages.toSequenceNr shouldEqual 6 // 10 - 2 * 2 + + retentionProbe.expectMessageType[Success[DeleteSnapshotsCompleted]].value match { + case DeleteSnapshotsCompleted(Criteria(SnapshotSelectionCriteria(maxSequenceNr, _, minSequenceNr, _))) => + maxSequenceNr shouldEqual 5 // 10 / 2 + minSequenceNr shouldEqual 1 + case signal => fail(s"Unexpected signal [$signal].") + } + + persistentActor ! IncrementWithPersistAll(10) + snapshotProbe.expectMessage(Try(Done)) + val secondDeleteMessages = retentionProbe.expectMessageType[Success[DeleteMessagesCompleted]].value + secondDeleteMessages.toSequenceNr shouldEqual 16 // 20 - 2 * 2 + + persistentActor ! GetValue(replyProbe.ref) + val state = replyProbe.expectMessageType[State] + state.value shouldEqual 20 + state.history shouldEqual (0 until state.value).toVector } def watcher(toWatch: ActorRef[_]): TestProbe[String] = { diff --git a/akka-persistence/src/main/scala/akka/persistence/JournalProtocol.scala b/akka-persistence/src/main/scala/akka/persistence/JournalProtocol.scala index 1f3cef3b4a..f75447c0de 100644 --- a/akka-persistence/src/main/scala/akka/persistence/JournalProtocol.scala +++ b/akka-persistence/src/main/scala/akka/persistence/JournalProtocol.scala @@ -144,3 +144,13 @@ private[persistence] object JournalProtocol { final case class ReplayMessagesFailure(cause: Throwable) extends Response with DeadLetterSuppression } + +/** + * Reply message to a successful [[JournalProtocol.DeleteMessagesTo]] request. + */ +final case class DeleteMessagesSuccess(toSequenceNr: Long) extends JournalProtocol.Response + +/** + * Reply message to a failed [[JournalProtocol.DeleteMessagesTo]] request. + */ +final case class DeleteMessagesFailure(cause: Throwable, toSequenceNr: Long) extends JournalProtocol.Response diff --git a/akka-persistence/src/main/scala/akka/persistence/PersistentActor.scala b/akka-persistence/src/main/scala/akka/persistence/PersistentActor.scala index a4dc89339d..6d1410b5c9 100644 --- a/akka-persistence/src/main/scala/akka/persistence/PersistentActor.scala +++ b/akka-persistence/src/main/scala/akka/persistence/PersistentActor.scala @@ -6,14 +6,14 @@ package akka.persistence import java.lang.{ Iterable => JIterable } -import akka.actor._ -import akka.japi.Procedure -import akka.japi.Util -import com.typesafe.config.Config import scala.collection.immutable import scala.util.control.NoStackTrace +import akka.actor._ import akka.annotation.InternalApi +import akka.japi.Procedure +import akka.japi.Util +import com.typesafe.config.Config abstract class RecoveryCompleted @@ -29,16 +29,6 @@ case object RecoveryCompleted extends RecoveryCompleted { def getInstance = this } -/** - * Reply message to a successful [[Eventsourced#deleteMessages]] request. - */ -final case class DeleteMessagesSuccess(toSequenceNr: Long) - -/** - * Reply message to a failed [[Eventsourced#deleteMessages]] request. - */ -final case class DeleteMessagesFailure(cause: Throwable, toSequenceNr: Long) - /** * Recovery mode configuration object to be returned in [[PersistentActor#recovery]]. *