use exclusive fromSequenceNumber in eventsByTag, #22145

* The reason is to have a consistent approach for Sequence and
  TimeBasedUUID, which are both intended as unique event identifiers.
* This means that you can use the offset that is returned in `EventEnvelope`
  as the `offset` parameter in a subsequent query.
This commit is contained in:
Patrik Nordwall 2017-01-16 11:13:45 +01:00
parent 4a9c753710
commit 8083c0bf4a
15 changed files with 101 additions and 34 deletions

View file

@ -101,6 +101,17 @@ public class PersistenceQueryDocTest {
TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS);
}
/**
* You can use `NoOffset` to retrieve all events with a given tag or retrieve a subset of all
* events by specifying a `Sequence` `offset`. The `offset` corresponds to an ordered sequence number for
* the specific tag. Note that the corresponding offset of each event is provided in the
* [[akka.persistence.query.EventEnvelope]], which makes it possible to resume the
* stream at a later point from a given offset.
*
* The `offset` is exclusive, i.e. the event with the exact same sequence number will not be included
* in the returned stream. This means that you can use the offset that is returned in `EventEnvelope`
* as the `offset` parameter in a subsequent query.
*/
@Override
public Source<EventEnvelope, NotUsed> eventsByTag(String tag, Offset offset) {
if(offset instanceof Sequence){
@ -108,7 +119,8 @@ public class PersistenceQueryDocTest {
final Props props = MyEventsByTagPublisher.props(tag, sequenceOffset.value(), refreshInterval);
return Source.<EventEnvelope>actorPublisher(props).
mapMaterializedValue(m -> NotUsed.getInstance());
}
} else if (offset == NoOffset.getInstance())
return eventsByTag(tag, Offset.sequence(0L)); //recursive
else
throw new IllegalArgumentException("MyJavadslReadJournal does not support " + offset.getClass().getName() + " offsets");
}

View file

@ -84,7 +84,7 @@ class MyEventsByTagJavaPublisher extends AbstractActorPublisher<EventEnvelope> {
private void query() {
if (buf.isEmpty()) {
final String query = "SELECT id, persistent_repr " +
"FROM journal WHERE tag = ? AND id >= ? " +
"FROM journal WHERE tag = ? AND id > ? " +
"ORDER BY id LIMIT ?";
try (PreparedStatement s = connection.prepareStatement(query)) {

View file

@ -95,10 +95,14 @@ with the given ``tags``.
.. includecode:: code/docs/persistence/query/LeveldbPersistenceQueryDocTest.java#tagger
You can retrieve a subset of all events by specifying ``offset``, or use ``0L`` to retrieve all
events with a given tag. The ``offset`` corresponds to an ordered sequence number for the specific tag.
Note that the corresponding offset of each event is provided in the ``EventEnvelope``, which makes it possible
to resume the stream at a later point from a given offset.
You can use ``NoOffset`` to retrieve all events with a given tag or retrieve a subset of all
events by specifying a ``Sequence`` ``offset``. The ``offset`` corresponds to an ordered sequence number for
the specific tag. Note that the corresponding offset of each event is provided in the
``EventEnvelope``, which makes it possible to resume the stream at a later point from a given offset.
The ``offset`` is exclusive, i.e. the event with the exact same sequence number will not be included
in the returned stream. This means that you can use the offset that is returned in ``EventEnvelope``
as the ``offset`` parameter in a subsequent query.
In addition to the ``offset`` the ``EventEnvelope`` also provides ``persistenceId`` and ``sequenceNr``
for each event. The ``sequenceNr`` is the sequence number for the persistent actor with the

View file

@ -51,7 +51,7 @@ class MyEventsByTagPublisher(tag: String, offset: Long, refreshInterval: FiniteD
private def statement() = connection.prepareStatement(
"""
SELECT id, persistent_repr FROM journal
WHERE tag = ? AND id >= ?
WHERE tag = ? AND id > ?
ORDER BY id LIMIT ?
""")

View file

@ -54,12 +54,24 @@ object PersistenceQueryDocSpec {
private val refreshInterval: FiniteDuration =
config.getDuration("refresh-interval", MILLISECONDS).millis
/**
* You can use `NoOffset` to retrieve all events with a given tag or retrieve a subset of all
* events by specifying a `Sequence` `offset`. The `offset` corresponds to an ordered sequence number for
* the specific tag. Note that the corresponding offset of each event is provided in the
* [[akka.persistence.query.EventEnvelope]], which makes it possible to resume the
* stream at a later point from a given offset.
*
* The `offset` is exclusive, i.e. the event with the exact same sequence number will not be included
* in the returned stream. This means that you can use the offset that is returned in `EventEnvelope`
* as the `offset` parameter in a subsequent query.
*/
override def eventsByTag(
tag: String, offset: Offset = Sequence(0L)): Source[EventEnvelope, NotUsed] = offset match {
case Sequence(offsetValue)
val props = MyEventsByTagPublisher.props(tag, offsetValue, refreshInterval)
Source.actorPublisher[EventEnvelope](props)
.mapMaterializedValue(_ => NotUsed)
case NoOffset => eventsByTag(tag, Sequence(0L)) //recursive
case _
throw new IllegalArgumentException("LevelDB does not support " + offset.getClass.getName + " offsets")
}

View file

@ -90,10 +90,14 @@ with the given ``tags``.
.. includecode:: code/docs/persistence/query/LeveldbPersistenceQueryDocSpec.scala#tagger
You can retrieve a subset of all events by specifying ``offset``, or use ``0L`` to retrieve all
events with a given tag. The ``offset`` corresponds to an ordered sequence number for the specific tag.
Note that the corresponding offset of each event is provided in the ``EventEnvelope``, which makes it possible
to resume the stream at a later point from a given offset.
You can use ``NoOffset`` to retrieve all events with a given tag or retrieve a subset of all
events by specifying a ``Sequence`` ``offset``. The ``offset`` corresponds to an ordered sequence number for
the specific tag. Note that the corresponding offset of each event is provided in the
``EventEnvelope``, which makes it possible to resume the stream at a later point from a given offset.
The ``offset`` is exclusive, i.e. the event with the exact same sequence number will not be included
in the returned stream. This means that you can use the offset that is returned in ``EventEnvelope``
as the ``offset`` parameter in a subsequent query.
In addition to the ``offset`` the ``EventEnvelope`` also provides ``persistenceId`` and ``sequenceNr``
for each event. The ``sequenceNr`` is the sequence number for the persistent actor with the

View file

@ -17,10 +17,28 @@ object Offset {
abstract class Offset
/**
* Corresponds to an ordered sequence number for the events. Note that the corresponding
* offset of each event is provided in the [[akka.persistence.query.EventEnvelope]],
* which makes it possible to resume the stream at a later point from a given offset.
*
* The `offset` is exclusive, i.e. the event with the exact same sequence number will not be included
* in the returned stream. This means that you can use the offset that is returned in `EventEnvelope`
* as the `offset` parameter in a subsequent query.
*/
final case class Sequence(value: Long) extends Offset with Ordered[Sequence] {
override def compare(that: Sequence): Int = value.compare(that.value)
}
/**
* Corresponds to an ordered unique identifier of the events. Note that the corresponding
* offset of each event is provided in the [[akka.persistence.query.EventEnvelope]],
* which makes it possible to resume the stream at a later point from a given offset.
*
* The `offset` is exclusive, i.e. the event with the exact same sequence number will not be included
* in the returned stream. This means that you can use the offset that is returned in `EventEnvelope`
* as the `offset` parameter in a subsequent query.
*/
final case class TimeBasedUUID(value: UUID) extends Offset with Ordered[TimeBasedUUID] {
if (value == null || value.version != 1) {
throw new IllegalArgumentException("UUID " + value + " is not a time-based UUID")
@ -29,6 +47,9 @@ final case class TimeBasedUUID(value: UUID) extends Offset with Ordered[TimeBase
override def compare(other: TimeBasedUUID): Int = value.compareTo(other.value)
}
/**
* Used when retrieving all events.
*/
final case object NoOffset extends Offset {
/**
* Java API:

View file

@ -78,7 +78,7 @@ private[akka] abstract class AbstractEventsByTagPublisher(
def receiveIdleRequest(): Unit
def timeForReplay: Boolean =
(buf.isEmpty || buf.size <= maxBufSize / 2) && (currOffset <= toOffset)
(buf.isEmpty || buf.size <= maxBufSize / 2) && (currOffset < toOffset)
def replay(): Unit = {
val limit = maxBufSize - buf.size
@ -94,7 +94,7 @@ private[akka] abstract class AbstractEventsByTagPublisher(
persistenceId = p.persistenceId,
sequenceNr = p.sequenceNr,
event = p.payload)
currOffset = offset + 1
currOffset = offset
deliverBuf()
case RecoverySuccess(highestSeqNr)
@ -143,13 +143,13 @@ private[akka] class LiveEventsByTagPublisher(
override def receiveIdleRequest(): Unit = {
deliverBuf()
if (buf.isEmpty && currOffset > toOffset)
if (buf.isEmpty && currOffset >= toOffset)
onCompleteThenStop()
}
override def receiveRecoverySuccess(highestSeqNr: Long): Unit = {
deliverBuf()
if (buf.isEmpty && currOffset > toOffset)
if (buf.isEmpty && currOffset >= toOffset)
onCompleteThenStop()
context.become(idle)
}
@ -174,7 +174,7 @@ private[akka] class CurrentEventsByTagPublisher(
override def receiveIdleRequest(): Unit = {
deliverBuf()
if (buf.isEmpty && currOffset > toOffset)
if (buf.isEmpty && currOffset >= toOffset)
onCompleteThenStop()
else
self ! Continue
@ -184,7 +184,7 @@ private[akka] class CurrentEventsByTagPublisher(
deliverBuf()
if (highestSeqNr < toOffset)
_toOffset = highestSeqNr
if (buf.isEmpty && (currOffset > toOffset || currOffset == fromOffset))
if (buf.isEmpty && (currOffset >= toOffset || currOffset == fromOffset))
onCompleteThenStop()
else
self ! Continue // more to fetch

View file

@ -116,6 +116,10 @@ class LeveldbReadJournal(scaladslReadJournal: akka.persistence.query.journal.lev
* `persistenceId` that persisted the event. The `persistenceId` + `sequenceNr` is an unique
* identifier for the event.
*
* The `offset` is exclusive, i.e. the event with the exact same sequence number will not be included
* in the returned stream. This means that you can use the offset that is returned in `EventEnvelope`
* as the `offset` parameter in a subsequent query.
*
* The returned event stream is ordered by the offset (tag sequence number), which corresponds
* to the same order as the write journal stored the events. The same stream elements (in same order)
* are returned for multiple executions of the query. Deleted events are not deleted from the

View file

@ -10,7 +10,7 @@ import akka.actor.ExtendedActorSystem
import akka.event.Logging
import akka.persistence.query.journal.leveldb.{ AllPersistenceIdsPublisher, EventsByPersistenceIdPublisher, EventsByTagPublisher }
import akka.persistence.query.scaladsl.{ ReadJournal, _ }
import akka.persistence.query.{ EventEnvelope, Offset, Sequence }
import akka.persistence.query.{ EventEnvelope, NoOffset, Offset, Sequence }
import akka.stream.scaladsl.Source
import akka.util.ByteString
import com.typesafe.config.Config
@ -128,12 +128,16 @@ class LeveldbReadJournal(system: ExtendedActorSystem, config: Config) extends Re
* To tag events you create an [[akka.persistence.journal.EventAdapter]] that wraps the events
* in a [[akka.persistence.journal.Tagged]] with the given `tags`.
*
* You can retrieve a subset of all events by specifying `offset`, or use `0L` to retrieve all
* events with a given tag. The `offset` corresponds to an ordered sequence number for
* You can use `NoOffset` to retrieve all events with a given tag or retrieve a subset of all
* events by specifying a `Sequence` `offset`. The `offset` corresponds to an ordered sequence number for
* the specific tag. Note that the corresponding offset of each event is provided in the
* [[akka.persistence.query.EventEnvelope]], which makes it possible to resume the
* stream at a later point from a given offset.
*
* The `offset` is exclusive, i.e. the event with the exact same sequence number will not be included
* in the returned stream. This means that you can use the offset that is returned in `EventEnvelope`
* as the `offset` parameter in a subsequent query.
*
* In addition to the `offset` the `EventEnvelope` also provides `persistenceId` and `sequenceNr`
* for each event. The `sequenceNr` is the sequence number for the persistent actor with the
* `persistenceId` that persisted the event. The `persistenceId` + `sequenceNr` is an unique
@ -163,7 +167,7 @@ class LeveldbReadJournal(system: ExtendedActorSystem, config: Config) extends Re
refreshInterval, maxBufSize, writeJournalPluginId))
.mapMaterializedValue(_ NotUsed)
.named("eventsByTag-" + URLEncoder.encode(tag, ByteString.UTF_8))
case NoOffset eventsByTag(tag, Sequence(0L)) //recursive
case _
throw new IllegalArgumentException("LevelDB does not support " + Logging.simpleName(offset.getClass) + " offsets")
}
@ -179,7 +183,7 @@ class LeveldbReadJournal(system: ExtendedActorSystem, config: Config) extends Re
Source.actorPublisher[EventEnvelope](EventsByTagPublisher.props(tag, seq.value, Long.MaxValue,
None, maxBufSize, writeJournalPluginId)).mapMaterializedValue(_ NotUsed)
.named("currentEventsByTag-" + URLEncoder.encode(tag, ByteString.UTF_8))
case NoOffset currentEventsByTag(tag, Sequence(0L)) //recursive
case _
throw new IllegalArgumentException("LevelDB does not support " + Logging.simpleName(offset.getClass) + " offsets")
}

View file

@ -13,6 +13,7 @@ import akka.stream.ActorMaterializer
import akka.stream.testkit.scaladsl.TestSink
import akka.testkit.AkkaSpec
import akka.testkit.ImplicitSender
import akka.persistence.query.NoOffset
object EventsByTagSpec {
val config = """
@ -72,7 +73,7 @@ class EventsByTagSpec extends AkkaSpec(EventsByTagSpec.config)
b ! "a green leaf"
expectMsg(s"a green leaf-done")
val greenSrc = queries.currentEventsByTag(tag = "green", offset = Sequence(0L))
val greenSrc = queries.currentEventsByTag(tag = "green", offset = NoOffset)
greenSrc.runWith(TestSink.probe[Any])
.request(2)
.expectNext(EventEnvelope(Sequence(1L), "a", 2L, "a green apple"))
@ -109,11 +110,11 @@ class EventsByTagSpec extends AkkaSpec(EventsByTagSpec.config)
.expectComplete() // green cucumber not seen
}
"find events from offset" in {
"find events from offset (exclusive)" in {
val greenSrc = queries.currentEventsByTag(tag = "green", offset = Sequence(2L))
val probe = greenSrc.runWith(TestSink.probe[Any])
.request(10)
.expectNext(EventEnvelope(Sequence(2L), "a", 3L, "a green banana"))
// note that banana is not included, since exclusive offset
.expectNext(EventEnvelope(Sequence(3L), "b", 2L, "a green leaf"))
.expectNext(EventEnvelope(Sequence(4L), "c", 1L, "a green cucumber"))
.expectComplete()
@ -124,7 +125,7 @@ class EventsByTagSpec extends AkkaSpec(EventsByTagSpec.config)
"find new events" in {
val d = system.actorOf(TestActor.props("d"))
val blackSrc = queries.eventsByTag(tag = "black", offset = Sequence(0L))
val blackSrc = queries.eventsByTag(tag = "black", offset = NoOffset)
val probe = blackSrc.runWith(TestSink.probe[Any])
.request(2)
.expectNext(EventEnvelope(Sequence(1L), "b", 1L, "a black car"))
@ -142,11 +143,11 @@ class EventsByTagSpec extends AkkaSpec(EventsByTagSpec.config)
.expectNext(EventEnvelope(Sequence(3L), "d", 2L, "a black night"))
}
"find events from offset" in {
"find events from offset (exclusive)" in {
val greenSrc = queries.eventsByTag(tag = "green", offset = Sequence(2L))
val probe = greenSrc.runWith(TestSink.probe[Any])
.request(10)
.expectNext(EventEnvelope(Sequence(2L), "a", 3L, "a green banana"))
// note that banana is not included, since exclusive offset
.expectNext(EventEnvelope(Sequence(3L), "b", 2L, "a green leaf"))
.expectNext(EventEnvelope(Sequence(4L), "c", 1L, "a green cucumber"))
.expectNoMsg(100.millis)

View file

@ -100,6 +100,10 @@ private[persistence] object LeveldbJournal {
final case class SubscribeTag(tag: String) extends SubscriptionCommand
final case class TaggedEventAppended(tag: String) extends DeadLetterSuppression
/**
* `fromSequenceNr` is exclusive
* `toSequenceNr` is inclusive
*/
final case class ReplayTaggedMessages(fromSequenceNr: Long, toSequenceNr: Long, max: Long,
tag: String, replyTo: ActorRef) extends SubscriptionCommand
final case class ReplayedTaggedMessage(persistent: PersistentRepr, tag: String, offset: Long)

View file

@ -100,7 +100,8 @@ private[persistence] trait LeveldbRecovery extends AsyncRecovery { this: Leveldb
}
withIterator { iter
val startKey = Key(tagNid, if (fromSequenceNr < 1L) 1L else fromSequenceNr, 0)
// fromSequenceNr is exclusive, i.e. start with +1
val startKey = Key(tagNid, if (fromSequenceNr < 1L) 1L else fromSequenceNr + 1, 0)
iter.seek(keyToBytes(startKey))
go(iter, startKey, 0L, replayCallback)
}

View file

@ -96,10 +96,10 @@ object SnapshotFailureRobustnessSpec {
}
class SnapshotFailureRobustnessSpec extends PersistenceSpec(PersistenceSpec.config("leveldb", "SnapshotFailureRobustnessSpec", serialization = "off", extraConfig = Some(
"""
akka.persistence.snapshot-store.local.class = "akka.persistence.SnapshotFailureRobustnessSpec$FailingLocalSnapshotStore"
akka.persistence.snapshot-store.local-delete-fail = ${akka.persistence.snapshot-store.local}
akka.persistence.snapshot-store.local-delete-fail.class = "akka.persistence.SnapshotFailureRobustnessSpec$DeleteFailingLocalSnapshotStore"
s"""
akka.persistence.snapshot-store.local.class = "akka.persistence.SnapshotFailureRobustnessSpec$$FailingLocalSnapshotStore"
akka.persistence.snapshot-store.local-delete-fail = $${akka.persistence.snapshot-store.local}
akka.persistence.snapshot-store.local-delete-fail.class = "akka.persistence.SnapshotFailureRobustnessSpec$$DeleteFailingLocalSnapshotStore"
"""))) with ImplicitSender {
import SnapshotFailureRobustnessSpec._

View file

@ -10,7 +10,7 @@ import akka.stream.impl.StreamLayout.Module
import akka.stream.impl._
import akka.stream.impl.fusing._
import akka.stream.stage._
import org.reactivestreams.{Processor, Publisher, Subscriber, Subscription}
import org.reactivestreams.{ Processor, Publisher, Subscriber, Subscription }
import scala.annotation.unchecked.uncheckedVariance
import scala.collection.immutable