parent
94cc028986
commit
1a90e715f5
13 changed files with 617 additions and 428 deletions
|
|
@ -4,14 +4,13 @@
|
|||
|
||||
package jdocs.persistence;
|
||||
|
||||
import static akka.pattern.Patterns.ask;
|
||||
|
||||
import java.sql.Connection;
|
||||
import java.time.Duration;
|
||||
import java.util.HashSet;
|
||||
import java.util.Set;
|
||||
|
||||
import akka.NotUsed;
|
||||
import akka.actor.typed.javadsl.AbstractBehavior;
|
||||
import akka.persistence.query.Sequence;
|
||||
import akka.persistence.query.Offset;
|
||||
import com.typesafe.config.Config;
|
||||
|
|
@ -27,7 +26,6 @@ import org.reactivestreams.Subscriber;
|
|||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.CompletionStage;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
public class PersistenceQueryDocTest {
|
||||
|
||||
|
|
@ -60,6 +58,8 @@ public class PersistenceQueryDocTest {
|
|||
}
|
||||
// #advanced-journal-query-types
|
||||
|
||||
interface OrderCompleted {}
|
||||
|
||||
public
|
||||
// #my-read-journal
|
||||
static class MyReadJournalProvider implements ReadJournalProvider {
|
||||
|
|
@ -266,13 +266,14 @@ public class PersistenceQueryDocTest {
|
|||
|
||||
// #events-by-tag
|
||||
// assuming journal is able to work with numeric offsets we can:
|
||||
final Source<EventEnvelope, NotUsed> blueThings =
|
||||
readJournal.eventsByTag("blue", new Sequence(0L));
|
||||
final Source<EventEnvelope, NotUsed> completedOrders =
|
||||
readJournal.eventsByTag("order-completed", new Sequence(0L));
|
||||
|
||||
// find top 10 blue things:
|
||||
final CompletionStage<List<Object>> top10BlueThings =
|
||||
blueThings
|
||||
// find first 10 completed orders:
|
||||
final CompletionStage<List<OrderCompleted>> firstCompleted =
|
||||
completedOrders
|
||||
.map(EventEnvelope::event)
|
||||
.collectType(OrderCompleted.class)
|
||||
.take(10) // cancels the query stream after pulling 10 elements
|
||||
.runFold(
|
||||
new ArrayList<>(10),
|
||||
|
|
@ -283,7 +284,8 @@ public class PersistenceQueryDocTest {
|
|||
system);
|
||||
|
||||
// start another query, from the known offset
|
||||
Source<EventEnvelope, NotUsed> blue = readJournal.eventsByTag("blue", new Sequence(10));
|
||||
Source<EventEnvelope, NotUsed> furtherOrders =
|
||||
readJournal.eventsByTag("order-completed", new Sequence(10));
|
||||
// #events-by-tag
|
||||
}
|
||||
|
||||
|
|
@ -354,7 +356,7 @@ public class PersistenceQueryDocTest {
|
|||
}
|
||||
|
||||
// #projection-into-different-store-simple-classes
|
||||
class ExampleStore {
|
||||
static class ExampleStore {
|
||||
CompletionStage<Void> save(Object any) {
|
||||
// ...
|
||||
// #projection-into-different-store-simple-classes
|
||||
|
|
@ -383,7 +385,7 @@ public class PersistenceQueryDocTest {
|
|||
}
|
||||
|
||||
// #projection-into-different-store
|
||||
class MyResumableProjection {
|
||||
static class MyResumableProjection {
|
||||
private final String name;
|
||||
|
||||
public MyResumableProjection(String name) {
|
||||
|
|
@ -406,40 +408,7 @@ public class PersistenceQueryDocTest {
|
|||
}
|
||||
// #projection-into-different-store
|
||||
|
||||
void demonstrateWritingIntoDifferentStoreWithResumableProjections() throws Exception {
|
||||
final ActorSystem system = ActorSystem.create();
|
||||
|
||||
final MyJavadslReadJournal readJournal =
|
||||
PersistenceQuery.get(system)
|
||||
.getReadJournalFor(
|
||||
MyJavadslReadJournal.class, "akka.persistence.query.my-read-journal");
|
||||
|
||||
// #projection-into-different-store-actor-run
|
||||
final Duration timeout = Duration.ofSeconds(3);
|
||||
|
||||
final MyResumableProjection bidProjection = new MyResumableProjection("bid");
|
||||
|
||||
final Props writerProps = Props.create(TheOneWhoWritesToQueryJournal.class, "bid");
|
||||
final ActorRef writer = system.actorOf(writerProps, "bid-projection-writer");
|
||||
|
||||
long startFromOffset =
|
||||
bidProjection.latestOffset().toCompletableFuture().get(3, TimeUnit.SECONDS);
|
||||
|
||||
readJournal
|
||||
.eventsByTag("bid", new Sequence(startFromOffset))
|
||||
.mapAsync(
|
||||
8,
|
||||
envelope -> {
|
||||
final CompletionStage<Object> f = ask(writer, envelope.event(), timeout);
|
||||
return f.thenApplyAsync(in -> envelope.offset(), system.dispatcher());
|
||||
})
|
||||
.mapAsync(1, offset -> bidProjection.saveProgress(offset))
|
||||
.runWith(Sink.ignore(), system);
|
||||
}
|
||||
|
||||
// #projection-into-different-store-actor-run
|
||||
|
||||
class ComplexState {
|
||||
static class ComplexState {
|
||||
|
||||
boolean readyToSave() {
|
||||
return false;
|
||||
|
|
@ -451,35 +420,4 @@ public class PersistenceQueryDocTest {
|
|||
return new Record();
|
||||
}
|
||||
}
|
||||
|
||||
// #projection-into-different-store-actor
|
||||
final class TheOneWhoWritesToQueryJournal extends AbstractActor {
|
||||
private final ExampleStore store;
|
||||
|
||||
private ComplexState state = new ComplexState();
|
||||
|
||||
public TheOneWhoWritesToQueryJournal() {
|
||||
store = new ExampleStore();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Receive createReceive() {
|
||||
return receiveBuilder()
|
||||
.matchAny(
|
||||
message -> {
|
||||
state = updateState(state, message);
|
||||
|
||||
// example saving logic that requires state to become ready:
|
||||
if (state.readyToSave()) store.save(Record.of(state));
|
||||
})
|
||||
.build();
|
||||
}
|
||||
|
||||
ComplexState updateState(ComplexState state, Object msg) {
|
||||
// some complicated aggregation logic here ...
|
||||
return state;
|
||||
}
|
||||
}
|
||||
// #projection-into-different-store-actor
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,111 @@
|
|||
/*
|
||||
* Copyright (C) 2019 Lightbend Inc. <https://www.lightbend.com>
|
||||
*/
|
||||
|
||||
package jdocs.persistence;
|
||||
|
||||
import akka.Done;
|
||||
import akka.actor.typed.ActorRef;
|
||||
import akka.actor.typed.ActorSystem;
|
||||
import akka.actor.typed.Behavior;
|
||||
import akka.actor.typed.javadsl.AbstractBehavior;
|
||||
import akka.actor.typed.javadsl.Adapter;
|
||||
import akka.actor.typed.javadsl.Behaviors;
|
||||
import akka.actor.typed.javadsl.Receive;
|
||||
import akka.actor.typed.javadsl.AskPattern;
|
||||
import akka.persistence.query.PersistenceQuery;
|
||||
import akka.persistence.query.Sequence;
|
||||
import akka.stream.javadsl.Sink;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.concurrent.CompletionStage;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import static jdocs.persistence.PersistenceQueryDocTest.*;
|
||||
|
||||
public interface ResumableProjectionExample {
|
||||
|
||||
public static void runQuery(
|
||||
ActorSystem<?> system, ActorRef<TheOneWhoWritesToQueryJournal.Command> writer)
|
||||
throws Exception {
|
||||
|
||||
final MyJavadslReadJournal readJournal =
|
||||
PersistenceQuery.get(Adapter.toClassic(system))
|
||||
.getReadJournalFor(
|
||||
MyJavadslReadJournal.class, "akka.persistence.query.my-read-journal");
|
||||
|
||||
// #projection-into-different-store-actor-run
|
||||
final Duration timeout = Duration.ofSeconds(3);
|
||||
|
||||
final MyResumableProjection bidProjection = new MyResumableProjection("bid");
|
||||
|
||||
long startFromOffset =
|
||||
bidProjection.latestOffset().toCompletableFuture().get(3, TimeUnit.SECONDS);
|
||||
|
||||
readJournal
|
||||
.eventsByTag("bid", new Sequence(startFromOffset))
|
||||
.mapAsync(
|
||||
8,
|
||||
envelope -> {
|
||||
final CompletionStage<Done> f =
|
||||
AskPattern.ask(
|
||||
writer,
|
||||
(ActorRef<Done> replyTo) ->
|
||||
new TheOneWhoWritesToQueryJournal.Update(envelope.event(), replyTo),
|
||||
timeout,
|
||||
system.scheduler());
|
||||
return f.thenApplyAsync(in -> envelope.offset(), system.executionContext());
|
||||
})
|
||||
.mapAsync(1, offset -> bidProjection.saveProgress(offset))
|
||||
.runWith(Sink.ignore(), system);
|
||||
}
|
||||
|
||||
// #projection-into-different-store-actor-run
|
||||
|
||||
// #projection-into-different-store-actor
|
||||
static final class TheOneWhoWritesToQueryJournal
|
||||
extends AbstractBehavior<TheOneWhoWritesToQueryJournal.Command> {
|
||||
|
||||
interface Command {}
|
||||
|
||||
static class Update implements Command {
|
||||
public final Object payload;
|
||||
public final ActorRef<Done> replyTo;
|
||||
|
||||
Update(Object payload, ActorRef<Done> replyTo) {
|
||||
this.payload = payload;
|
||||
this.replyTo = replyTo;
|
||||
}
|
||||
}
|
||||
|
||||
public static Behavior<Command> create(String id, ExampleStore store) {
|
||||
return Behaviors.setup(context -> new TheOneWhoWritesToQueryJournal(store));
|
||||
}
|
||||
|
||||
private final ExampleStore store;
|
||||
|
||||
private ComplexState state = new ComplexState();
|
||||
|
||||
private TheOneWhoWritesToQueryJournal(ExampleStore store) {
|
||||
this.store = store;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Receive<Command> createReceive() {
|
||||
return newReceiveBuilder().onMessage(Update.class, this::onUpdate).build();
|
||||
}
|
||||
|
||||
private Behavior<Command> onUpdate(Update msg) {
|
||||
state = updateState(state, msg);
|
||||
if (state.readyToSave()) store.save(Record.of(state));
|
||||
return this;
|
||||
}
|
||||
|
||||
ComplexState updateState(ComplexState state, Update msg) {
|
||||
// some complicated aggregation logic here ...
|
||||
return state;
|
||||
}
|
||||
}
|
||||
// #projection-into-different-store-actor
|
||||
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue