New Persistence Query interfaces for bySlices queries (#30882)

* and addition of TimestampOffset
* ApiMayChange
* small mention in docs

eventsBySlices is intended to be a better way to retrieve all events for an entity type
than eventsByTag.

The usage of `eventsByTag` for Projections has the major drawback that the number of tags
must be decided up-front and can't easily be changed afterwards. Starting with too many
tags means much overhead since many projection instances would be running on each node
in a small Akka Cluster. Each projection instance polling the database periodically.
Starting with too few tags means that it can't be scaled later to more Akka nodes.

Instead of tags we can store a slice number by hashing the persistence id.
Like `math.abs(persistenceId.hashCode % numberOfSlices)`.

Then the Projection query can be a range query of the slices. For example if using 128
slices and running 4 Projection instances the slice ranges would be 0-31, 32-63, 64-95,
96-128. That can easily be split to more Projection instances when needed and still
reuse the offsets for the previous range distributions.
This commit is contained in:
Patrik Nordwall 2021-11-15 17:31:23 +01:00 committed by GitHub
parent 060fce1720
commit 20b684f04f
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
14 changed files with 631 additions and 0 deletions

View file

@ -169,6 +169,13 @@ that is able to order events by insertion time it could treat the Long as a time
If your usage does not require a live stream, you can use the `currentEventsByTag` query.
#### EventsBySlice and CurrentEventsBySlice
Query events for given entity type and slices. A slice is deterministically defined based on the persistence id.
The purpose is to evenly distribute all persistence ids over the slices.
See @apidoc[akka.persistence.query.typed.*.EventsBySliceQuery] and @apidoc[akka.persistence.query.typed.*.CurrentEventsBySliceQuery].
### Materialized values of queries
Journals are able to provide additional information related to a query by exposing @ref:[Materialized values](stream/stream-quickstart.md#materialized-values-quick),

View file

@ -4,8 +4,10 @@
package akka.persistence.query
import java.time.Instant
import java.util.UUID
import akka.annotation.ApiMayChange
import akka.util.UUIDComparator
object Offset {
@ -14,6 +16,7 @@ object Offset {
def noOffset: Offset = NoOffset
def sequence(value: Long): Offset = Sequence(value)
def timeBasedUUID(uuid: UUID): Offset = TimeBasedUUID(uuid)
def timestamp(instant: Instant): TimestampOffset = TimestampOffset(instant, instant, Map.empty)
}
@ -49,6 +52,63 @@ final case class TimeBasedUUID(value: UUID) extends Offset with Ordered[TimeBase
override def compare(other: TimeBasedUUID): Int = UUIDComparator.comparator.compare(value, other.value)
}
object TimestampOffset {
val Zero: TimestampOffset = TimestampOffset(Instant.EPOCH, Instant.EPOCH, Map.empty)
def apply(timestamp: Instant, seen: Map[String, Long]): TimestampOffset =
TimestampOffset(timestamp, Instant.EPOCH, seen)
/**
* Try to convert the Offset to a TimestampOffset. Epoch timestamp is used for `NoOffset`.
*/
def toTimestampOffset(offset: Offset): TimestampOffset = {
offset match {
case t: TimestampOffset => t
case NoOffset => TimestampOffset.Zero
case null => throw new IllegalArgumentException("Offset must not be null")
case other =>
throw new IllegalArgumentException(
s"Supported offset types are TimestampOffset and NoOffset, " +
s"received ${other.getClass.getName}")
}
}
}
/**
* Timestamp based offset. Since there can be several events for the same timestamp it keeps
* track of what sequence nrs for every persistence id that have been seen at this specific timestamp.
*
* The `offset` is exclusive, i.e. the event with the exact same sequence number will not be included
* in the returned stream. This means that you can use the offset that is returned in `EventEnvelope`
* as the `offset` parameter in a subsequent query.
*
* API May Change
*
* @param timestamp
* time when the event was stored, microsecond granularity database timestamp
* @param readTimestamp
* time when the event was read, microsecond granularity database timestamp
* @param seen
* List of sequence nrs for every persistence id seen at this timestamp
*/
@ApiMayChange
final case class TimestampOffset(timestamp: Instant, readTimestamp: Instant, seen: Map[String, Long]) extends Offset {
/** Java API */
def getSeen(): java.util.Map[String, java.lang.Long] = {
import akka.util.ccompat.JavaConverters._
seen.map { case (pid, seqNr) => pid -> java.lang.Long.valueOf(seqNr) }.asJava
}
override def hashCode(): Int = timestamp.hashCode()
override def equals(obj: Any): Boolean =
obj match {
case other: TimestampOffset => timestamp == other.timestamp && seen == other.seen
case _ => false
}
}
/**
* Used when retrieving all events.
*/

View file

@ -0,0 +1,116 @@
/*
* Copyright (C) 2021 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.persistence.query.typed
import java.util.Optional
import akka.annotation.ApiMayChange
import akka.persistence.query.Offset
import akka.util.HashCode
object EventEnvelope {
def apply[Event](
offset: Offset,
persistenceId: String,
sequenceNr: Long,
event: Event,
timestamp: Long,
entityType: String,
slice: Int): EventEnvelope[Event] =
new EventEnvelope(offset, persistenceId, sequenceNr, Option(event), timestamp, None, entityType, slice)
def create[Event](
offset: Offset,
persistenceId: String,
sequenceNr: Long,
event: Event,
timestamp: Long,
entityType: String,
slice: Int): EventEnvelope[Event] =
apply(offset, persistenceId, sequenceNr, event, timestamp, entityType, slice)
def unapply[Event](arg: EventEnvelope[Event]): Option[(Offset, String, Long, Option[Event], Long)] =
Some((arg.offset, arg.persistenceId, arg.sequenceNr, arg.eventOption, arg.timestamp))
}
/**
* Event wrapper adding meta data for the events in the result stream of
* [[akka.persistence.query.typed.scaladsl.EventsBySliceQuery]] query, or similar queries.
*
* If the `event` is not defined it has not been loaded yet. It can be loaded with
* [[akka.persistence.query.typed.scaladsl.LoadEventQuery]].
*
* The `timestamp` is the time the event was stored, in milliseconds since midnight, January 1, 1970 UTC (same as
* `System.currentTimeMillis`).
*
* It is an improved `EventEnvelope` compared to [[akka.persistence.query.EventEnvelope]].
*
* API May Change
*/
@ApiMayChange
final class EventEnvelope[Event](
val offset: Offset,
val persistenceId: String,
val sequenceNr: Long,
val eventOption: Option[Event],
val timestamp: Long,
val eventMetadata: Option[Any],
val entityType: String,
val slice: Int) {
def event: Event =
eventOption match {
case Some(evt) => evt
case None =>
throw new IllegalStateException(
"Event was not loaded. Use eventOption and load the event on demand with LoadEventQuery.")
}
/**
* Java API
*/
def getEvent(): Event =
eventOption match {
case Some(evt) => evt
case None =>
throw new IllegalStateException(
"Event was not loaded. Use getOptionalEvent and load the event on demand with LoadEventQuery.")
}
/**
* Java API
*/
def getOptionalEvent(): Optional[Event] = {
import scala.compat.java8.OptionConverters._
eventOption.asJava
}
/**
* Java API
*/
def getEventMetaData(): Optional[AnyRef] = {
import scala.compat.java8.OptionConverters._
eventMetadata.map(_.asInstanceOf[AnyRef]).asJava
}
override def hashCode(): Int = {
var result = HashCode.SEED
result = HashCode.hash(result, offset)
result = HashCode.hash(result, persistenceId)
result = HashCode.hash(result, sequenceNr)
result
}
override def equals(obj: Any): Boolean = obj match {
case other: EventEnvelope[_] =>
offset == other.offset && persistenceId == other.persistenceId && sequenceNr == other.sequenceNr &&
eventOption == other.eventOption && timestamp == other.timestamp && eventMetadata == other.eventMetadata &&
entityType == other.entityType && slice == other.slice
case _ => false
}
override def toString: String =
s"EventBySliceEnvelope($offset,$persistenceId,$sequenceNr,$eventOption,$timestamp,$eventMetadata,$entityType,$slice)"
}

View file

@ -0,0 +1,38 @@
/*
* Copyright (C) 2021 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.persistence.query.typed.javadsl
import akka.NotUsed
import akka.annotation.ApiMayChange
import akka.japi.Pair
import akka.persistence.query.Offset
import akka.persistence.query.javadsl.ReadJournal
import akka.persistence.query.typed.EventEnvelope
import akka.stream.javadsl.Source
/**
* A plugin may optionally support this query by implementing this trait.
*
* API May Change
*/
@ApiMayChange
trait CurrentEventsBySliceQuery extends ReadJournal {
/**
* Same type of query as [[EventsBySliceQuery.eventsBySlices]] but the event stream is completed immediately when it
* reaches the end of the "result set". Depending on journal implementation, this may mean all events up to when the
* query is started, or it may include events that are persisted while the query is still streaming results. For
* eventually consistent stores, it may only include all events up to some point before the query is started.
*/
def currentEventsBySlices[Event](
entityType: String,
minSlice: Int,
maxSlice: Int,
offset: Offset): Source[EventEnvelope[Event], NotUsed]
def sliceForPersistenceId(persistenceId: String): Int
def sliceRanges(numberOfRanges: Int): java.util.List[Pair[Integer, Integer]]
}

View file

@ -0,0 +1,72 @@
/*
* Copyright (C) 2021 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.persistence.query.typed.javadsl
import akka.NotUsed
import akka.annotation.ApiMayChange
import akka.japi.Pair
import akka.persistence.query.DurableStateChange
import akka.persistence.query.Offset
import akka.persistence.state.javadsl.DurableStateStore
import akka.stream.javadsl.Source
/**
* Query API for reading durable state objects.
*
* For Scala API see [[DurableStateStoreBySliceQuery]].
*
* API May Change
*/
@ApiMayChange
trait DurableStateStoreBySliceQuery[A] extends DurableStateStore[A] {
/**
* Get a source of the most recent changes made to objects with the given slice range since the passed in offset.
*
* A slice is deterministically defined based on the persistence id. The purpose is to evenly distribute all
* persistence ids over the slices.
*
* Note that this only returns the most recent change to each object, if an object has been updated multiple times
* since the offset, only the most recent of those changes will be part of the stream.
*
* This will return changes that occurred up to when the `Source` returned by this call is materialized. Changes to
* objects made since materialization are not guaranteed to be included in the results.
*
* The [[DurableStateChange]] elements can be [[akka.persistence.query.UpdatedDurableState]] or `DeletedDurableState`.
* `DeletedDurableState` is not implemented yet, see issue https://github.com/akka/akka/issues/30446.
*/
def currentChangesBySlices(
entityType: String,
minSlice: Int,
maxSlice: Int,
offset: Offset): Source[DurableStateChange[A], NotUsed]
/**
* Get a source of the most recent changes made to objects of the given slice range since the passed in offset.
*
* A slice is deterministically defined based on the persistence id. The purpose is to evenly distribute all
* persistence ids over the slices.
*
* The returned source will never terminate, it effectively watches for changes to the objects and emits changes as
* they happen.
*
* Not all changes that occur are guaranteed to be emitted, this call only guarantees that eventually, the most recent
* change for each object since the offset will be emitted. In particular, multiple updates to a given object in quick
* succession are likely to be skipped, with only the last update resulting in a change from this source.
*
* The [[DurableStateChange]] elements can be [[akka.persistence.query.UpdatedDurableState]] or `DeletedDurableState`.
* `DeletedDurableState` is not implemented yet, see issue https://github.com/akka/akka/issues/30446.
*/
def changesBySlices(
entityType: String,
minSlice: Int,
maxSlice: Int,
offset: Offset): Source[DurableStateChange[A], NotUsed]
def sliceForPersistenceId(persistenceId: String): Int
def sliceRanges(numberOfRanges: Int): java.util.List[Pair[Integer, Integer]]
}

View file

@ -0,0 +1,24 @@
/*
* Copyright (C) 2021 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.persistence.query.typed.javadsl
import java.time.Instant
import java.util.Optional
import java.util.concurrent.CompletionStage
import akka.annotation.ApiMayChange
import akka.persistence.query.javadsl.ReadJournal
/**
* [[EventsBySliceQuery]] that is using a timestamp based offset should also implement this query.
*
* API May Change
*/
@ApiMayChange
trait EventTimestampQuery extends ReadJournal {
def timestampOf(persistenceId: String, sequenceNr: Long): CompletionStage[Optional[Instant]]
}

View file

@ -0,0 +1,62 @@
/*
* Copyright (C) 2021 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.persistence.query.typed.javadsl
import akka.NotUsed
import akka.annotation.ApiMayChange
import akka.japi.Pair
import akka.persistence.query.Offset
import akka.persistence.query.javadsl.ReadJournal
import akka.persistence.query.typed.EventEnvelope
import akka.stream.javadsl.Source
/**
* A plugin may optionally support this query by implementing this trait.
*
* `EventsBySliceQuery` that is using a timestamp based offset should also implement [[EventTimestampQuery]] and
* [[LoadEventQuery]].
*
* API May Change
*/
@ApiMayChange
trait EventsBySliceQuery extends ReadJournal {
/**
* Query events for given slices. A slice is deterministically defined based on the persistence id. The purpose is to
* evenly distribute all persistence ids over the slices.
*
* The consumer can keep track of its current position in the event stream by storing the `offset` and restart the
* query from a given `offset` after a crash/restart.
*
* The exact meaning of the `offset` depends on the journal and must be documented by the read journal plugin. It may
* be a sequential id number that uniquely identifies the position of each event within the event stream. Distributed
* data stores cannot easily support those semantics and they may use a weaker meaning. For example it may be a
* timestamp (taken when the event was created or stored). Timestamps are not unique and not strictly ordered, since
* clocks on different machines may not be synchronized.
*
* In strongly consistent stores, where the `offset` is unique and strictly ordered, the stream should start from the
* next event after the `offset`. Otherwise, the read journal should ensure that between an invocation that returned
* an event with the given `offset`, and this invocation, no events are missed. Depending on the journal
* implementation, this may mean that this invocation will return events that were already returned by the previous
* invocation, including the event with the passed in `offset`.
*
* The returned event stream should be ordered by `offset` if possible, but this can also be difficult to fulfill for
* a distributed data store. The order must be documented by the read journal plugin.
*
* The stream is not completed when it reaches the end of the currently stored events, but it continues to push new
* events when new events are persisted. Corresponding query that is completed when it reaches the end of the
* currently stored events is provided by [[CurrentEventsBySliceQuery.currentEventsBySlices]].
*/
def eventsBySlices[Event](
entityType: String,
minSlice: Int,
maxSlice: Int,
offset: Offset): Source[EventEnvelope[Event], NotUsed]
def sliceForPersistenceId(persistenceId: String): Int
def sliceRanges(numberOfRanges: Int): java.util.List[Pair[Integer, Integer]]
}

View file

@ -0,0 +1,26 @@
/*
* Copyright (C) 2021 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.persistence.query.typed.javadsl
import java.util.concurrent.CompletionStage
import akka.annotation.ApiMayChange
import akka.persistence.query.javadsl.ReadJournal
import akka.persistence.query.typed.EventEnvelope
/**
* [[EventsBySliceQuery]] that is using a timestamp based offset should also implement this query.
*
* API May Change
*/
@ApiMayChange
trait LoadEventQuery extends ReadJournal {
/**
* Load a single event on demand. The `CompletionStage` is completed with an `IllegalArgumentException` if
* the event for the given `persistenceId` and `sequenceNr` doesn't exist.
*/
def loadEnvelope[Event](persistenceId: String, sequenceNr: Long): CompletionStage[EventEnvelope[Event]]
}

View file

@ -0,0 +1,39 @@
/*
* Copyright (C) 2021 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.persistence.query.typed.scaladsl
import scala.collection.immutable
import akka.NotUsed
import akka.annotation.ApiMayChange
import akka.persistence.query.Offset
import akka.persistence.query.scaladsl.ReadJournal
import akka.persistence.query.typed.EventEnvelope
import akka.stream.scaladsl.Source
/**
* A plugin may optionally support this query by implementing this trait.
*
* API May Change
*/
@ApiMayChange
trait CurrentEventsBySliceQuery extends ReadJournal {
/**
* Same type of query as [[EventsBySliceQuery.eventsBySlices]] but the event stream is completed immediately when it
* reaches the end of the "result set". Depending on journal implementation, this may mean all events up to when the
* query is started, or it may include events that are persisted while the query is still streaming results. For
* eventually consistent stores, it may only include all events up to some point before the query is started.
*/
def currentEventsBySlices[Event](
entityType: String,
minSlice: Int,
maxSlice: Int,
offset: Offset): Source[EventEnvelope[Event], NotUsed]
def sliceForPersistenceId(persistenceId: String): Int
def sliceRanges(numberOfRanges: Int): immutable.Seq[Range]
}

View file

@ -0,0 +1,73 @@
/*
* Copyright (C) 2021 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.persistence.query.typed.scaladsl
import scala.collection.immutable
import akka.NotUsed
import akka.annotation.ApiMayChange
import akka.persistence.query.DurableStateChange
import akka.persistence.query.Offset
import akka.persistence.state.scaladsl.DurableStateStore
import akka.stream.scaladsl.Source
/**
* Query API for reading durable state objects.
*
* For Java API see [[DurableStateStoreBySliceQuery]].
*
* API May Change
*/
@ApiMayChange
trait DurableStateStoreBySliceQuery[A] extends DurableStateStore[A] {
/**
* Get a source of the most recent changes made to objects with the given slice range since the passed in offset.
*
* A slice is deterministically defined based on the persistence id. The purpose is to evenly distribute all
* persistence ids over the slices.
*
* Note that this only returns the most recent change to each object, if an object has been updated multiple times
* since the offset, only the most recent of those changes will be part of the stream.
*
* This will return changes that occurred up to when the `Source` returned by this call is materialized. Changes to
* objects made since materialization are not guaranteed to be included in the results.
*
* The [[DurableStateChange]] elements can be [[akka.persistence.query.UpdatedDurableState]] or `DeletedDurableState`.
* `DeletedDurableState` is not implemented yet, see issue https://github.com/akka/akka/issues/30446.
*/
def currentChangesBySlices(
entityType: String,
minSlice: Int,
maxSlice: Int,
offset: Offset): Source[DurableStateChange[A], NotUsed]
/**
* Get a source of the most recent changes made to objects of the given slice range since the passed in offset.
*
* A slice is deterministically defined based on the persistence id. The purpose is to evenly distribute all
* persistence ids over the slices.
*
* The returned source will never terminate, it effectively watches for changes to the objects and emits changes as
* they happen.
*
* Not all changes that occur are guaranteed to be emitted, this call only guarantees that eventually, the most recent
* change for each object since the offset will be emitted. In particular, multiple updates to a given object in quick
* succession are likely to be skipped, with only the last update resulting in a change from this source.
*
* The [[DurableStateChange]] elements can be [[akka.persistence.query.UpdatedDurableState]] or `DeletedDurableState`.
* `DeletedDurableState` is not implemented yet, see issue https://github.com/akka/akka/issues/30446.
*/
def changesBySlices(
entityType: String,
minSlice: Int,
maxSlice: Int,
offset: Offset): Source[DurableStateChange[A], NotUsed]
def sliceForPersistenceId(persistenceId: String): Int
def sliceRanges(numberOfRanges: Int): immutable.Seq[Range]
}

View file

@ -0,0 +1,24 @@
/*
* Copyright (C) 2021 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.persistence.query.typed.scaladsl
import java.time.Instant
import scala.concurrent.Future
import akka.annotation.ApiMayChange
import akka.persistence.query.scaladsl.ReadJournal
/**
* [[EventsBySliceQuery]] that is using a timestamp based offset should also implement this query.
*
* API May Change
*/
@ApiMayChange
trait EventTimestampQuery extends ReadJournal {
def timestampOf(persistenceId: String, sequenceNr: Long): Future[Option[Instant]]
}

View file

@ -0,0 +1,63 @@
/*
* Copyright (C) 2021 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.persistence.query.typed.scaladsl
import scala.collection.immutable
import akka.NotUsed
import akka.annotation.ApiMayChange
import akka.persistence.query.Offset
import akka.persistence.query.scaladsl.ReadJournal
import akka.persistence.query.typed.EventEnvelope
import akka.stream.scaladsl.Source
/**
* A plugin may optionally support this query by implementing this trait.
*
* `EventsBySliceQuery` that is using a timestamp based offset should also implement [[EventTimestampQuery]] and
* [[LoadEventQuery]].
*
* API May Change
*/
@ApiMayChange
trait EventsBySliceQuery extends ReadJournal with EventTimestampQuery with LoadEventQuery {
/**
* Query events for given slices. A slice is deterministically defined based on the persistence id. The purpose is to
* evenly distribute all persistence ids over the slices.
*
* The consumer can keep track of its current position in the event stream by storing the `offset` and restart the
* query from a given `offset` after a crash/restart.
*
* The exact meaning of the `offset` depends on the journal and must be documented by the read journal plugin. It may
* be a sequential id number that uniquely identifies the position of each event within the event stream. Distributed
* data stores cannot easily support those semantics and they may use a weaker meaning. For example it may be a
* timestamp (taken when the event was created or stored). Timestamps are not unique and not strictly ordered, since
* clocks on different machines may not be synchronized.
*
* In strongly consistent stores, where the `offset` is unique and strictly ordered, the stream should start from the
* next event after the `offset`. Otherwise, the read journal should ensure that between an invocation that returned
* an event with the given `offset`, and this invocation, no events are missed. Depending on the journal
* implementation, this may mean that this invocation will return events that were already returned by the previous
* invocation, including the event with the passed in `offset`.
*
* The returned event stream should be ordered by `offset` if possible, but this can also be difficult to fulfill for
* a distributed data store. The order must be documented by the read journal plugin.
*
* The stream is not completed when it reaches the end of the currently stored events, but it continues to push new
* events when new events are persisted. Corresponding query that is completed when it reaches the end of the
* currently stored events is provided by [[CurrentEventsBySliceQuery.currentEventsBySlices]].
*/
def eventsBySlices[Event](
entityType: String,
minSlice: Int,
maxSlice: Int,
offset: Offset): Source[EventEnvelope[Event], NotUsed]
def sliceForPersistenceId(persistenceId: String): Int
def sliceRanges(numberOfRanges: Int): immutable.Seq[Range]
}

View file

@ -0,0 +1,26 @@
/*
* Copyright (C) 2021 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.persistence.query.typed.scaladsl
import scala.concurrent.Future
import akka.annotation.ApiMayChange
import akka.persistence.query.scaladsl.ReadJournal
import akka.persistence.query.typed.EventEnvelope
/**
* [[EventsBySliceQuery]] that is using a timestamp based offset should also implement this query.
*
* API May Change
*/
@ApiMayChange
trait LoadEventQuery extends ReadJournal {
/**
* Load a single event on demand. The `Future` is completed with an `IllegalArgumentException` if
* the event for the given `persistenceId` and `sequenceNr` doesn't exist.
*/
def loadEnvelope[Event](persistenceId: String, sequenceNr: Long): Future[EventEnvelope[Event]]
}

View file

@ -34,4 +34,5 @@ class OffsetSpec extends AnyWordSpecLike with Matchers {
Random.shuffle(sequenceBasedList).sorted shouldEqual sequenceBasedList
}
}
}