Merge pull request #22146 from akka/wip-22145-exclusive-Sequence-patriknw

use exclusive fromSequenceNumber in eventsByTag, #22145
This commit is contained in:
Patrik Nordwall 2017-01-17 17:48:35 +01:00 committed by GitHub
commit c61ed5a240
14 changed files with 100 additions and 33 deletions

View file

@ -101,6 +101,17 @@ public class PersistenceQueryDocTest {
TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS);
}
/**
* You can use `NoOffset` to retrieve all events with a given tag or retrieve a subset of all
* events by specifying a `Sequence` `offset`. The `offset` corresponds to an ordered sequence number for
* the specific tag. Note that the corresponding offset of each event is provided in the
* [[akka.persistence.query.EventEnvelope]], which makes it possible to resume the
* stream at a later point from a given offset.
*
* The `offset` is exclusive, i.e. the event with the exact same sequence number will not be included
* in the returned stream. This means that you can use the offset that is returned in `EventEnvelope`
* as the `offset` parameter in a subsequent query.
*/
@Override
public Source<EventEnvelope, NotUsed> eventsByTag(String tag, Offset offset) {
if(offset instanceof Sequence){
@ -108,7 +119,8 @@ public class PersistenceQueryDocTest {
final Props props = MyEventsByTagPublisher.props(tag, sequenceOffset.value(), refreshInterval);
return Source.<EventEnvelope>actorPublisher(props).
mapMaterializedValue(m -> NotUsed.getInstance());
}
} else if (offset == NoOffset.getInstance())
return eventsByTag(tag, Offset.sequence(0L)); //recursive
else
throw new IllegalArgumentException("MyJavadslReadJournal does not support " + offset.getClass().getName() + " offsets");
}

View file

@ -84,7 +84,7 @@ class MyEventsByTagJavaPublisher extends AbstractActorPublisher<EventEnvelope> {
private void query() {
if (buf.isEmpty()) {
final String query = "SELECT id, persistent_repr " +
"FROM journal WHERE tag = ? AND id >= ? " +
"FROM journal WHERE tag = ? AND id > ? " +
"ORDER BY id LIMIT ?";
try (PreparedStatement s = connection.prepareStatement(query)) {