parent
e74f77236c
commit
fc4e84a322
3 changed files with 4 additions and 192 deletions
|
|
@ -254,26 +254,11 @@ Java
|
||||||
|
|
||||||
### Resumable projections
|
### Resumable projections
|
||||||
|
|
||||||
Sometimes you may need to implement "resumable" projections, that will not start from the beginning of time each time
|
Sometimes you may need to use "resumable" projections, which will not start from the beginning of time each time
|
||||||
when run. In this case you will need to store the sequence number (or `offset`) of the processed event and use it
|
when run. In such case, the sequence number (or `offset`) of the processed event will be stored and
|
||||||
the next time this projection is started. This pattern is not built-in, however is rather simple to implement yourself.
|
used the next time this projection is started. This pattern is implemented in the
|
||||||
|
[Akka Projections](https://doc.akka.io/docs/akka-projection/current/) module.
|
||||||
|
|
||||||
The example below additionally highlights how you would use Actors to implement the write side, in case
|
|
||||||
you need to do some complex logic that would be best handled inside an Actor before persisting the event
|
|
||||||
into the other datastore:
|
|
||||||
|
|
||||||
Scala
|
|
||||||
: @@snip [PersistenceQueryDocSpec.scala](/akka-docs/src/test/scala/docs/persistence/query/PersistenceQueryDocSpec.scala) { #projection-into-different-store-actor-run }
|
|
||||||
|
|
||||||
Java
|
|
||||||
: @@snip [ResumableProjectionExample.java](/akka-docs/src/test/java/jdocs/persistence/ResumableProjectionExample.java) { #projection-into-different-store-actor-run }
|
|
||||||
|
|
||||||
|
|
||||||
Scala
|
|
||||||
: @@snip [PersistenceQueryDocSpec.scala](/akka-docs/src/test/scala/docs/persistence/query/PersistenceQueryDocSpec.scala) { #projection-into-different-store-actor }
|
|
||||||
|
|
||||||
Java
|
|
||||||
: @@snip [ResumableProjectionExample.java](/akka-docs/src/test/java/jdocs/persistence/ResumableProjectionExample.java) { #projection-into-different-store-actor }
|
|
||||||
|
|
||||||
<a id="read-journal-plugin-api"></a>
|
<a id="read-journal-plugin-api"></a>
|
||||||
## Query plugins
|
## Query plugins
|
||||||
|
|
|
||||||
|
|
@ -1,112 +0,0 @@
|
||||||
/*
|
|
||||||
* Copyright (C) 2019-2021 Lightbend Inc. <https://www.lightbend.com>
|
|
||||||
*/
|
|
||||||
|
|
||||||
package jdocs.persistence;
|
|
||||||
|
|
||||||
import akka.Done;
|
|
||||||
import akka.actor.typed.ActorRef;
|
|
||||||
import akka.actor.typed.ActorSystem;
|
|
||||||
import akka.actor.typed.Behavior;
|
|
||||||
import akka.actor.typed.javadsl.AbstractBehavior;
|
|
||||||
import akka.actor.typed.javadsl.ActorContext;
|
|
||||||
import akka.actor.typed.javadsl.Behaviors;
|
|
||||||
import akka.actor.typed.javadsl.Receive;
|
|
||||||
import akka.actor.typed.javadsl.AskPattern;
|
|
||||||
import akka.persistence.query.PersistenceQuery;
|
|
||||||
import akka.persistence.query.Sequence;
|
|
||||||
import akka.stream.javadsl.Sink;
|
|
||||||
|
|
||||||
import java.time.Duration;
|
|
||||||
import java.util.concurrent.CompletionStage;
|
|
||||||
import java.util.concurrent.TimeUnit;
|
|
||||||
|
|
||||||
import static jdocs.persistence.PersistenceQueryDocTest.*;
|
|
||||||
|
|
||||||
public interface ResumableProjectionExample {
|
|
||||||
|
|
||||||
public static void runQuery(
|
|
||||||
ActorSystem<?> system, ActorRef<TheOneWhoWritesToQueryJournal.Command> writer)
|
|
||||||
throws Exception {
|
|
||||||
|
|
||||||
final MyJavadslReadJournal readJournal =
|
|
||||||
PersistenceQuery.get(system)
|
|
||||||
.getReadJournalFor(
|
|
||||||
MyJavadslReadJournal.class, "akka.persistence.query.my-read-journal");
|
|
||||||
|
|
||||||
// #projection-into-different-store-actor-run
|
|
||||||
final Duration timeout = Duration.ofSeconds(3);
|
|
||||||
|
|
||||||
final MyResumableProjection bidProjection = new MyResumableProjection("bid");
|
|
||||||
|
|
||||||
long startFromOffset =
|
|
||||||
bidProjection.latestOffset().toCompletableFuture().get(3, TimeUnit.SECONDS);
|
|
||||||
|
|
||||||
readJournal
|
|
||||||
.eventsByTag("bid", new Sequence(startFromOffset))
|
|
||||||
.mapAsync(
|
|
||||||
8,
|
|
||||||
envelope -> {
|
|
||||||
final CompletionStage<Done> f =
|
|
||||||
AskPattern.ask(
|
|
||||||
writer,
|
|
||||||
(ActorRef<Done> replyTo) ->
|
|
||||||
new TheOneWhoWritesToQueryJournal.Update(envelope.event(), replyTo),
|
|
||||||
timeout,
|
|
||||||
system.scheduler());
|
|
||||||
return f.thenApplyAsync(in -> envelope.offset(), system.executionContext());
|
|
||||||
})
|
|
||||||
.mapAsync(1, offset -> bidProjection.saveProgress(offset))
|
|
||||||
.runWith(Sink.ignore(), system);
|
|
||||||
}
|
|
||||||
|
|
||||||
// #projection-into-different-store-actor-run
|
|
||||||
|
|
||||||
// #projection-into-different-store-actor
|
|
||||||
static final class TheOneWhoWritesToQueryJournal
|
|
||||||
extends AbstractBehavior<TheOneWhoWritesToQueryJournal.Command> {
|
|
||||||
|
|
||||||
interface Command {}
|
|
||||||
|
|
||||||
static class Update implements Command {
|
|
||||||
public final Object payload;
|
|
||||||
public final ActorRef<Done> replyTo;
|
|
||||||
|
|
||||||
Update(Object payload, ActorRef<Done> replyTo) {
|
|
||||||
this.payload = payload;
|
|
||||||
this.replyTo = replyTo;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public static Behavior<Command> create(String id, ExampleStore store) {
|
|
||||||
return Behaviors.setup(context -> new TheOneWhoWritesToQueryJournal(context, store));
|
|
||||||
}
|
|
||||||
|
|
||||||
private final ExampleStore store;
|
|
||||||
|
|
||||||
private ComplexState state = new ComplexState();
|
|
||||||
|
|
||||||
private TheOneWhoWritesToQueryJournal(ActorContext<Command> context, ExampleStore store) {
|
|
||||||
super(context);
|
|
||||||
this.store = store;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public Receive<Command> createReceive() {
|
|
||||||
return newReceiveBuilder().onMessage(Update.class, this::onUpdate).build();
|
|
||||||
}
|
|
||||||
|
|
||||||
private Behavior<Command> onUpdate(Update msg) {
|
|
||||||
state = updateState(state, msg);
|
|
||||||
if (state.readyToSave()) store.save(Record.of(state));
|
|
||||||
return this;
|
|
||||||
}
|
|
||||||
|
|
||||||
ComplexState updateState(ComplexState state, Update msg) {
|
|
||||||
// some complicated aggregation logic here ...
|
|
||||||
return state;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// #projection-into-different-store-actor
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
@ -167,33 +167,6 @@ object PersistenceQueryDocSpec {
|
||||||
//#projection-into-different-store-rs
|
//#projection-into-different-store-rs
|
||||||
}
|
}
|
||||||
|
|
||||||
import akka.actor.typed.ActorRef
|
|
||||||
//#projection-into-different-store-actor
|
|
||||||
object TheOneWhoWritesToQueryJournal {
|
|
||||||
|
|
||||||
sealed trait Command
|
|
||||||
final case class Update(payload: Any, replyTo: ActorRef[Done]) extends Command
|
|
||||||
|
|
||||||
def apply(id: String, store: ExampleStore): Behavior[Command] = {
|
|
||||||
updated(ComplexState(), store)
|
|
||||||
}
|
|
||||||
|
|
||||||
private def updated(state: ComplexState, store: ExampleStore): Behavior[Command] = {
|
|
||||||
Behaviors.receiveMessage {
|
|
||||||
case command: Update =>
|
|
||||||
val newState = updateState(state, command)
|
|
||||||
if (state.readyToSave) store.save(Record(state))
|
|
||||||
updated(newState, store)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private def updateState(state: ComplexState, command: Command): ComplexState = {
|
|
||||||
// some complicated aggregation logic here ...
|
|
||||||
state
|
|
||||||
}
|
|
||||||
}
|
|
||||||
//#projection-into-different-store-actor
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
class PersistenceQueryDocSpec(s: String) extends AkkaSpec(s) {
|
class PersistenceQueryDocSpec(s: String) extends AkkaSpec(s) {
|
||||||
|
|
@ -282,40 +255,6 @@ class PersistenceQueryDocSpec(s: String) extends AkkaSpec(s) {
|
||||||
}
|
}
|
||||||
//#projection-into-different-store
|
//#projection-into-different-store
|
||||||
|
|
||||||
class RunWithActor {
|
|
||||||
import akka.actor.typed.ActorSystem
|
|
||||||
import akka.actor.typed.ActorRef
|
|
||||||
import akka.actor.typed.scaladsl.adapter._
|
|
||||||
import akka.actor.typed.scaladsl.AskPattern._
|
|
||||||
|
|
||||||
//#projection-into-different-store-actor-run
|
|
||||||
def runQuery(writer: ActorRef[TheOneWhoWritesToQueryJournal.Command])(implicit system: ActorSystem[_]): Unit = {
|
|
||||||
|
|
||||||
val readJournal =
|
|
||||||
PersistenceQuery(system.toClassic).readJournalFor[MyScaladslReadJournal](JournalId)
|
|
||||||
|
|
||||||
import system.executionContext
|
|
||||||
implicit val timeout = Timeout(3.seconds)
|
|
||||||
|
|
||||||
val bidProjection = new MyResumableProjection("bid")
|
|
||||||
|
|
||||||
bidProjection.latestOffset.foreach { startFromOffset =>
|
|
||||||
readJournal
|
|
||||||
.eventsByTag("bid", Sequence(startFromOffset))
|
|
||||||
.mapAsync(8) { envelope =>
|
|
||||||
writer
|
|
||||||
.ask((replyTo: ActorRef[Done]) => TheOneWhoWritesToQueryJournal.Update(envelope.event, replyTo))
|
|
||||||
.map(_ => envelope.offset)
|
|
||||||
}
|
|
||||||
.mapAsync(1) { offset =>
|
|
||||||
bidProjection.saveProgress(offset)
|
|
||||||
}
|
|
||||||
.runWith(Sink.ignore)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
//#projection-into-different-store-actor-run
|
|
||||||
}
|
|
||||||
|
|
||||||
class RunWithAsyncFunction {
|
class RunWithAsyncFunction {
|
||||||
val readJournal =
|
val readJournal =
|
||||||
PersistenceQuery(system).readJournalFor[MyScaladslReadJournal]("akka.persistence.query.my-read-journal")
|
PersistenceQuery(system).readJournalFor[MyScaladslReadJournal]("akka.persistence.query.my-read-journal")
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue