diff --git a/akka-docs/src/main/paradox/persistence-query.md b/akka-docs/src/main/paradox/persistence-query.md index 02a14318be..5c9b1bc0a7 100644 --- a/akka-docs/src/main/paradox/persistence-query.md +++ b/akka-docs/src/main/paradox/persistence-query.md @@ -254,26 +254,11 @@ Java ### Resumable projections -Sometimes you may need to implement "resumable" projections, that will not start from the beginning of time each time -when run. In this case you will need to store the sequence number (or `offset`) of the processed event and use it -the next time this projection is started. This pattern is not built-in, however is rather simple to implement yourself. +Sometimes you may need to use "resumable" projections, which will not start from the beginning of time each time +when run. In such case, the sequence number (or `offset`) of the processed event will be stored and +used the next time this projection is started. This pattern is implemented in the +[Akka Projections](https://doc.akka.io/docs/akka-projection/current/) module. -The example below additionally highlights how you would use Actors to implement the write side, in case -you need to do some complex logic that would be best handled inside an Actor before persisting the event -into the other datastore: - -Scala -: @@snip [PersistenceQueryDocSpec.scala](/akka-docs/src/test/scala/docs/persistence/query/PersistenceQueryDocSpec.scala) { #projection-into-different-store-actor-run } - -Java -: @@snip [ResumableProjectionExample.java](/akka-docs/src/test/java/jdocs/persistence/ResumableProjectionExample.java) { #projection-into-different-store-actor-run } - - -Scala -: @@snip [PersistenceQueryDocSpec.scala](/akka-docs/src/test/scala/docs/persistence/query/PersistenceQueryDocSpec.scala) { #projection-into-different-store-actor } - -Java -: @@snip [ResumableProjectionExample.java](/akka-docs/src/test/java/jdocs/persistence/ResumableProjectionExample.java) { #projection-into-different-store-actor } ## Query plugins diff --git a/akka-docs/src/test/java/jdocs/persistence/ResumableProjectionExample.java b/akka-docs/src/test/java/jdocs/persistence/ResumableProjectionExample.java deleted file mode 100644 index b663485958..0000000000 --- a/akka-docs/src/test/java/jdocs/persistence/ResumableProjectionExample.java +++ /dev/null @@ -1,112 +0,0 @@ -/* - * Copyright (C) 2019-2021 Lightbend Inc. - */ - -package jdocs.persistence; - -import akka.Done; -import akka.actor.typed.ActorRef; -import akka.actor.typed.ActorSystem; -import akka.actor.typed.Behavior; -import akka.actor.typed.javadsl.AbstractBehavior; -import akka.actor.typed.javadsl.ActorContext; -import akka.actor.typed.javadsl.Behaviors; -import akka.actor.typed.javadsl.Receive; -import akka.actor.typed.javadsl.AskPattern; -import akka.persistence.query.PersistenceQuery; -import akka.persistence.query.Sequence; -import akka.stream.javadsl.Sink; - -import java.time.Duration; -import java.util.concurrent.CompletionStage; -import java.util.concurrent.TimeUnit; - -import static jdocs.persistence.PersistenceQueryDocTest.*; - -public interface ResumableProjectionExample { - - public static void runQuery( - ActorSystem system, ActorRef writer) - throws Exception { - - final MyJavadslReadJournal readJournal = - PersistenceQuery.get(system) - .getReadJournalFor( - MyJavadslReadJournal.class, "akka.persistence.query.my-read-journal"); - - // #projection-into-different-store-actor-run - final Duration timeout = Duration.ofSeconds(3); - - final MyResumableProjection bidProjection = new MyResumableProjection("bid"); - - long startFromOffset = - bidProjection.latestOffset().toCompletableFuture().get(3, TimeUnit.SECONDS); - - readJournal - .eventsByTag("bid", new Sequence(startFromOffset)) - .mapAsync( - 8, - envelope -> { - final CompletionStage f = - AskPattern.ask( - writer, - (ActorRef replyTo) -> - new TheOneWhoWritesToQueryJournal.Update(envelope.event(), replyTo), - timeout, - system.scheduler()); - return f.thenApplyAsync(in -> envelope.offset(), system.executionContext()); - }) - .mapAsync(1, offset -> bidProjection.saveProgress(offset)) - .runWith(Sink.ignore(), system); - } - - // #projection-into-different-store-actor-run - - // #projection-into-different-store-actor - static final class TheOneWhoWritesToQueryJournal - extends AbstractBehavior { - - interface Command {} - - static class Update implements Command { - public final Object payload; - public final ActorRef replyTo; - - Update(Object payload, ActorRef replyTo) { - this.payload = payload; - this.replyTo = replyTo; - } - } - - public static Behavior create(String id, ExampleStore store) { - return Behaviors.setup(context -> new TheOneWhoWritesToQueryJournal(context, store)); - } - - private final ExampleStore store; - - private ComplexState state = new ComplexState(); - - private TheOneWhoWritesToQueryJournal(ActorContext context, ExampleStore store) { - super(context); - this.store = store; - } - - @Override - public Receive createReceive() { - return newReceiveBuilder().onMessage(Update.class, this::onUpdate).build(); - } - - private Behavior onUpdate(Update msg) { - state = updateState(state, msg); - if (state.readyToSave()) store.save(Record.of(state)); - return this; - } - - ComplexState updateState(ComplexState state, Update msg) { - // some complicated aggregation logic here ... - return state; - } - } - // #projection-into-different-store-actor - -} diff --git a/akka-docs/src/test/scala/docs/persistence/query/PersistenceQueryDocSpec.scala b/akka-docs/src/test/scala/docs/persistence/query/PersistenceQueryDocSpec.scala index fd52c0fcb3..8f8af6211d 100644 --- a/akka-docs/src/test/scala/docs/persistence/query/PersistenceQueryDocSpec.scala +++ b/akka-docs/src/test/scala/docs/persistence/query/PersistenceQueryDocSpec.scala @@ -167,33 +167,6 @@ object PersistenceQueryDocSpec { //#projection-into-different-store-rs } - import akka.actor.typed.ActorRef - //#projection-into-different-store-actor - object TheOneWhoWritesToQueryJournal { - - sealed trait Command - final case class Update(payload: Any, replyTo: ActorRef[Done]) extends Command - - def apply(id: String, store: ExampleStore): Behavior[Command] = { - updated(ComplexState(), store) - } - - private def updated(state: ComplexState, store: ExampleStore): Behavior[Command] = { - Behaviors.receiveMessage { - case command: Update => - val newState = updateState(state, command) - if (state.readyToSave) store.save(Record(state)) - updated(newState, store) - } - } - - private def updateState(state: ComplexState, command: Command): ComplexState = { - // some complicated aggregation logic here ... - state - } - } - //#projection-into-different-store-actor - } class PersistenceQueryDocSpec(s: String) extends AkkaSpec(s) { @@ -282,40 +255,6 @@ class PersistenceQueryDocSpec(s: String) extends AkkaSpec(s) { } //#projection-into-different-store - class RunWithActor { - import akka.actor.typed.ActorSystem - import akka.actor.typed.ActorRef - import akka.actor.typed.scaladsl.adapter._ - import akka.actor.typed.scaladsl.AskPattern._ - - //#projection-into-different-store-actor-run - def runQuery(writer: ActorRef[TheOneWhoWritesToQueryJournal.Command])(implicit system: ActorSystem[_]): Unit = { - - val readJournal = - PersistenceQuery(system.toClassic).readJournalFor[MyScaladslReadJournal](JournalId) - - import system.executionContext - implicit val timeout = Timeout(3.seconds) - - val bidProjection = new MyResumableProjection("bid") - - bidProjection.latestOffset.foreach { startFromOffset => - readJournal - .eventsByTag("bid", Sequence(startFromOffset)) - .mapAsync(8) { envelope => - writer - .ask((replyTo: ActorRef[Done]) => TheOneWhoWritesToQueryJournal.Update(envelope.event, replyTo)) - .map(_ => envelope.offset) - } - .mapAsync(1) { offset => - bidProjection.saveProgress(offset) - } - .runWith(Sink.ignore) - } - } - //#projection-into-different-store-actor-run - } - class RunWithAsyncFunction { val readJournal = PersistenceQuery(system).readJournalFor[MyScaladslReadJournal]("akka.persistence.query.my-read-journal")