/** * Copyright (C) 2009-2016 Lightbend Inc. */ package docs.persistence; import static akka.pattern.PatternsCS.ask; import java.util.HashSet; import java.util.Set; import akka.NotUsed; import akka.persistence.query.Sequence; import akka.persistence.query.Offset; import com.typesafe.config.Config; import akka.actor.*; import akka.japi.pf.ReceiveBuilder; import akka.persistence.query.*; import akka.stream.ActorMaterializer; import akka.stream.javadsl.Sink; import akka.stream.javadsl.Source; import akka.util.Timeout; import docs.persistence.query.MyEventsByTagPublisher; import org.reactivestreams.Subscriber; import scala.concurrent.Future; import scala.concurrent.duration.FiniteDuration; import java.util.ArrayList; import java.util.List; import java.util.concurrent.CompletionStage; import java.util.concurrent.TimeUnit; public class PersistenceQueryDocTest { final ActorSystem system = ActorSystem.create(); final ActorMaterializer mat = ActorMaterializer.create(system); static //#advanced-journal-query-types public class RichEvent { public final Settags; public final Object payload; public RichEvent(Set tags, Object payload) { this.tags = tags; this.payload = payload; } } //#advanced-journal-query-types static //#advanced-journal-query-types // a plugin can provide: public final class QueryMetadata{ public final boolean deterministicOrder; public final boolean infinite; public QueryMetadata(boolean deterministicOrder, boolean infinite) { this.deterministicOrder = deterministicOrder; this.infinite = infinite; } } //#advanced-journal-query-types static //#my-read-journal public class MyReadJournalProvider implements ReadJournalProvider { private final MyJavadslReadJournal javadslReadJournal; public MyReadJournalProvider(ExtendedActorSystem system, Config config) { this.javadslReadJournal = new MyJavadslReadJournal(system, config); } @Override public MyScaladslReadJournal scaladslReadJournal() { return new MyScaladslReadJournal(javadslReadJournal); } @Override public MyJavadslReadJournal javadslReadJournal() { return this.javadslReadJournal; } } //#my-read-journal static //#my-read-journal public class MyJavadslReadJournal implements akka.persistence.query.javadsl.ReadJournal, akka.persistence.query.javadsl.EventsByTagQuery2, akka.persistence.query.javadsl.EventsByPersistenceIdQuery, akka.persistence.query.javadsl.AllPersistenceIdsQuery, akka.persistence.query.javadsl.CurrentPersistenceIdsQuery { private final FiniteDuration refreshInterval; public MyJavadslReadJournal(ExtendedActorSystem system, Config config) { refreshInterval = FiniteDuration.create(config.getDuration("refresh-interval", TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS); } @Override public Source eventsByTag(String tag, Offset offset) { if(offset instanceof Sequence){ Sequence sequenceOffset = (Sequence) offset; final Props props = MyEventsByTagPublisher.props(tag, sequenceOffset.value(), refreshInterval); return Source.actorPublisher(props). mapMaterializedValue(m -> NotUsed.getInstance()); } else throw new IllegalArgumentException("MyJavadslReadJournal does not support " + offset.getClass().getName() + " offsets"); } @Override public Source eventsByPersistenceId(String persistenceId, long fromSequenceNr, long toSequenceNr) { // implement in a similar way as eventsByTag throw new UnsupportedOperationException("Not implemented yet"); } @Override public Source allPersistenceIds() { // implement in a similar way as eventsByTag throw new UnsupportedOperationException("Not implemented yet"); } @Override public Source currentPersistenceIds() { // implement in a similar way as eventsByTag throw new UnsupportedOperationException("Not implemented yet"); } // possibility to add more plugin specific queries //#advanced-journal-query-definition public Source byTagsWithMeta(Set tags) { //#advanced-journal-query-definition // implement in a similar way as eventsByTag throw new UnsupportedOperationException("Not implemented yet"); } } //#my-read-journal static //#my-read-journal public class MyScaladslReadJournal implements akka.persistence.query.scaladsl.ReadJournal, akka.persistence.query.scaladsl.EventsByTagQuery2, akka.persistence.query.scaladsl.EventsByPersistenceIdQuery, akka.persistence.query.scaladsl.AllPersistenceIdsQuery, akka.persistence.query.scaladsl.CurrentPersistenceIdsQuery { private final MyJavadslReadJournal javadslReadJournal; public MyScaladslReadJournal(MyJavadslReadJournal javadslReadJournal) { this.javadslReadJournal = javadslReadJournal; } @Override public akka.stream.scaladsl.Source eventsByTag( String tag, akka.persistence.query.Offset offset) { return javadslReadJournal.eventsByTag(tag, offset).asScala(); } @Override public akka.stream.scaladsl.Source eventsByPersistenceId( String persistenceId, long fromSequenceNr, long toSequenceNr) { return javadslReadJournal.eventsByPersistenceId(persistenceId, fromSequenceNr, toSequenceNr).asScala(); } @Override public akka.stream.scaladsl.Source allPersistenceIds() { return javadslReadJournal.allPersistenceIds().asScala(); } @Override public akka.stream.scaladsl.Source currentPersistenceIds() { return javadslReadJournal.currentPersistenceIds().asScala(); } // possibility to add more plugin specific queries public akka.stream.scaladsl.Source byTagsWithMeta( scala.collection.Set tags) { Set jTags = scala.collection.JavaConversions.setAsJavaSet(tags); return javadslReadJournal.byTagsWithMeta(jTags).asScala(); } } //#my-read-journal void demonstrateBasicUsage() { final ActorSystem system = ActorSystem.create(); //#basic-usage // obtain read journal by plugin id final MyJavadslReadJournal readJournal = PersistenceQuery.get(system).getReadJournalFor(MyJavadslReadJournal.class, "akka.persistence.query.my-read-journal"); // issue query to journal Source source = readJournal.eventsByPersistenceId("user-1337", 0, Long.MAX_VALUE); // materialize stream, consuming events ActorMaterializer mat = ActorMaterializer.create(system); source.runForeach(event -> System.out.println("Event: " + event), mat); //#basic-usage } void demonstrateAllPersistenceIdsLive() { final MyJavadslReadJournal readJournal = PersistenceQuery.get(system).getReadJournalFor(MyJavadslReadJournal.class, "akka.persistence.query.my-read-journal"); //#all-persistence-ids-live readJournal.allPersistenceIds(); //#all-persistence-ids-live } void demonstrateNoRefresh() { final ActorSystem system = ActorSystem.create(); final MyJavadslReadJournal readJournal = PersistenceQuery.get(system).getReadJournalFor(MyJavadslReadJournal.class, "akka.persistence.query.my-read-journal"); //#all-persistence-ids-snap readJournal.currentPersistenceIds(); //#all-persistence-ids-snap } void demonstrateRefresh() { final ActorSystem system = ActorSystem.create(); final MyJavadslReadJournal readJournal = PersistenceQuery.get(system).getReadJournalFor(MyJavadslReadJournal.class, "akka.persistence.query.my-read-journal"); //#events-by-persistent-id readJournal.eventsByPersistenceId("user-us-1337", 0L, Long.MAX_VALUE); //#events-by-persistent-id } void demonstrateEventsByTag() { final ActorSystem system = ActorSystem.create(); final ActorMaterializer mat = ActorMaterializer.create(system); final MyJavadslReadJournal readJournal = PersistenceQuery.get(system).getReadJournalFor(MyJavadslReadJournal.class, "akka.persistence.query.my-read-journal"); //#events-by-tag // assuming journal is able to work with numeric offsets we can: final Source blueThings = readJournal.eventsByTag("blue", new Sequence(0L)); // find top 10 blue things: final Future> top10BlueThings = (Future>) blueThings .map(t -> t.event()) .take(10) // cancels the query stream after pulling 10 elements .>runFold(new ArrayList<>(10), (acc, e) -> { acc.add(e); return acc; }, mat); // start another query, from the known offset Source blue = readJournal.eventsByTag("blue", new Sequence(10)); //#events-by-tag } void demonstrateMaterializedQueryValues() { final ActorSystem system = ActorSystem.create(); final ActorMaterializer mat = ActorMaterializer.create(system); final MyJavadslReadJournal readJournal = PersistenceQuery.get(system).getReadJournalFor(MyJavadslReadJournal.class, "akka.persistence.query.my-read-journal"); //#advanced-journal-query-usage Set tags = new HashSet(); tags.add("red"); tags.add("blue"); final Source events = readJournal.byTagsWithMeta(tags) .mapMaterializedValue(meta -> { System.out.println("The query is: " + "ordered deterministically: " + meta.deterministicOrder + " " + "infinite: " + meta.infinite); return meta; }); events.map(event -> { System.out.println("Event payload: " + event.payload); return event.payload; }).runWith(Sink.ignore(), mat); //#advanced-journal-query-usage } class ReactiveStreamsCompatibleDBDriver { Subscriber> batchWriter() { return null; } } void demonstrateWritingIntoDifferentStore() { final ActorSystem system = ActorSystem.create(); final ActorMaterializer mat = ActorMaterializer.create(system); final MyJavadslReadJournal readJournal = PersistenceQuery.get(system).getReadJournalFor(MyJavadslReadJournal.class, "akka.persistence.query.my-read-journal"); //#projection-into-different-store-rs final ReactiveStreamsCompatibleDBDriver driver = new ReactiveStreamsCompatibleDBDriver(); final Subscriber> dbBatchWriter = driver.batchWriter(); // Using an example (Reactive Streams) Database driver readJournal .eventsByPersistenceId("user-1337", 0L, Long.MAX_VALUE) .map(envelope -> envelope.event()) .grouped(20) // batch inserts into groups of 20 .runWith(Sink.fromSubscriber(dbBatchWriter), mat); // write batches to read-side database //#projection-into-different-store-rs } //#projection-into-different-store-simple-classes class ExampleStore { CompletionStage save(Object any) { // ... //#projection-into-different-store-simple-classes return null; //#projection-into-different-store-simple-classes } } //#projection-into-different-store-simple-classes void demonstrateWritingIntoDifferentStoreWithMapAsync() { final ActorSystem system = ActorSystem.create(); final ActorMaterializer mat = ActorMaterializer.create(system); final MyJavadslReadJournal readJournal = PersistenceQuery.get(system).getReadJournalFor(MyJavadslReadJournal.class, "akka.persistence.query.my-read-journal"); //#projection-into-different-store-simple final ExampleStore store = new ExampleStore(); readJournal .eventsByTag("bid", new Sequence(0L)) .mapAsync(1, store::save) .runWith(Sink.ignore(), mat); //#projection-into-different-store-simple } //#projection-into-different-store class MyResumableProjection { private final String name; public MyResumableProjection(String name) { this.name = name; } public CompletionStage saveProgress(Offset offset) { // ... //#projection-into-different-store return null; //#projection-into-different-store } public CompletionStage latestOffset() { // ... //#projection-into-different-store return null; //#projection-into-different-store } } //#projection-into-different-store void demonstrateWritingIntoDifferentStoreWithResumableProjections() throws Exception { final ActorSystem system = ActorSystem.create(); final ActorMaterializer mat = ActorMaterializer.create(system); final MyJavadslReadJournal readJournal = PersistenceQuery.get(system).getReadJournalFor(MyJavadslReadJournal.class, "akka.persistence.query.my-read-journal"); //#projection-into-different-store-actor-run final Timeout timeout = Timeout.apply(3, TimeUnit.SECONDS); final MyResumableProjection bidProjection = new MyResumableProjection("bid"); final Props writerProps = Props.create(TheOneWhoWritesToQueryJournal.class, "bid"); final ActorRef writer = system.actorOf(writerProps, "bid-projection-writer"); long startFromOffset = bidProjection.latestOffset().toCompletableFuture().get(3, TimeUnit.SECONDS); readJournal .eventsByTag("bid", new Sequence(startFromOffset)) .mapAsync(8, envelope -> { final CompletionStage f = ask(writer, envelope.event(), timeout); return f.thenApplyAsync(in -> envelope.offset(), system.dispatcher()); }) .mapAsync(1, offset -> bidProjection.saveProgress(offset)) .runWith(Sink.ignore(), mat); } //#projection-into-different-store-actor-run class ComplexState { boolean readyToSave() { return false; } } static class Record { static Record of(Object any) { return new Record(); } } //#projection-into-different-store-actor final class TheOneWhoWritesToQueryJournal extends AbstractActor { private final ExampleStore store; private ComplexState state = new ComplexState(); public TheOneWhoWritesToQueryJournal() { store = new ExampleStore(); receive(ReceiveBuilder.matchAny(message -> { state = updateState(state, message); // example saving logic that requires state to become ready: if (state.readyToSave()) store.save(Record.of(state)); }).build()); } ComplexState updateState(ComplexState state, Object msg) { // some complicated aggregation logic here ... return state; } } //#projection-into-different-store-actor }