Timestamp in EventEnvelope, #28331 (#28383)

* added to EventEnvolope and therefore include case class stuff
  for binary compatibility
* also added in PersistentRepr, which for example is the serialized format in
  LeveldbJournal
This commit is contained in:
Patrik Nordwall 2020-01-07 11:32:15 +01:00 committed by Arnout Engelen
parent 290a6137e0
commit efa856bc17
12 changed files with 287 additions and 34 deletions

View file

@ -4,8 +4,76 @@
package akka.persistence.query
import scala.runtime.AbstractFunction4
import akka.util.HashCode
// for binary compatibility (used to be a case class)
object EventEnvelope extends AbstractFunction4[Offset, String, Long, Any, EventEnvelope] {
def apply(offset: Offset, persistenceId: String, sequenceNr: Long, event: Any, timestamp: Long): EventEnvelope =
new EventEnvelope(offset, persistenceId, sequenceNr, event, timestamp)
override def apply(offset: Offset, persistenceId: String, sequenceNr: Long, event: Any): EventEnvelope =
new EventEnvelope(offset, persistenceId, sequenceNr, event)
def unapply(arg: EventEnvelope): Option[(Offset, String, Long, Any)] =
Some((arg.offset, arg.persistenceId, arg.sequenceNr, arg.timestamp))
}
/**
* Event wrapper adding meta data for the events in the result stream of
* [[akka.persistence.query.scaladsl.EventsByTagQuery]] query, or similar queries.
*
* The `timestamp` is the time the event was stored, in milliseconds since midnight, January 1, 1970 UTC
* (same as `System.currentTimeMillis`).
*/
final case class EventEnvelope(offset: Offset, persistenceId: String, sequenceNr: Long, event: Any)
final class EventEnvelope(
val offset: Offset,
val persistenceId: String,
val sequenceNr: Long,
val event: Any,
val timestamp: Long)
extends Product4[Offset, String, Long, Any]
with Serializable {
// for binary compatibility
def this(offset: Offset, persistenceId: String, sequenceNr: Long, event: Any) =
this(offset, persistenceId, sequenceNr, event, 0L)
override def hashCode(): Int = {
var result = HashCode.SEED
result = HashCode.hash(result, offset)
result = HashCode.hash(result, persistenceId)
result = HashCode.hash(result, sequenceNr)
result = HashCode.hash(result, event)
result
}
override def equals(obj: Any): Boolean = obj match {
case other: EventEnvelope =>
offset == other.offset && persistenceId == other.persistenceId && sequenceNr == other.sequenceNr &&
event == other.event // timestamp not included in equals for backwards compatibility
case _ => false
}
override def toString: String =
s"EventEnvelope($offset,$persistenceId,$sequenceNr,$event,$timestamp)"
// for binary compatibility (used to be a case class)
def copy(
offset: Offset = this.offset,
persistenceId: String = this.persistenceId,
sequenceNr: Long = this.sequenceNr,
event: Any = this.event): EventEnvelope =
new EventEnvelope(offset, persistenceId, sequenceNr, event, timestamp)
// Product4, for binary compatibility (used to be a case class)
override def productPrefix = "EventEnvelope"
override def _1: Offset = offset
override def _2: String = persistenceId
override def _3: Long = sequenceNr
override def _4: Any = event
override def canEqual(that: Any): Boolean = that.isInstanceOf[EventEnvelope]
}

View file

@ -108,7 +108,8 @@ final private[akka] class EventsByPersistenceIdStage(
offset = Sequence(pr.sequenceNr),
persistenceId = pr.persistenceId,
sequenceNr = pr.sequenceNr,
event = pr.payload))
event = pr.payload,
timestamp = pr.timestamp))
nextSequenceNr = pr.sequenceNr + 1
deliverBuf(out)

View file

@ -106,7 +106,8 @@ final private[leveldb] class EventsByTagStage(
offset = Sequence(offset),
persistenceId = p.persistenceId,
sequenceNr = p.sequenceNr,
event = p.payload))
event = p.payload,
timestamp = p.timestamp))
currOffset = offset
deliverBuf(out)

View file

@ -11,9 +11,10 @@ import akka.persistence.query.scaladsl.EventsByTagQuery
import akka.stream.testkit.scaladsl.TestSink
import akka.testkit.AkkaSpec
import akka.testkit.ImplicitSender
import scala.concurrent.duration._
import akka.persistence.query.EventEnvelope
object EventsByPersistenceIdSpec {
val config = """
akka.loglevel = INFO
@ -143,6 +144,19 @@ class EventsByPersistenceIdSpec extends AkkaSpec(EventsByPersistenceIdSpec.confi
src.map(_.event).runWith(TestSink.probe[Any]).request(1).expectComplete()
}
"include timestamp in EventEnvelope" in {
setup("m")
val src = queries.currentEventsByPersistenceId("m", 0L, Long.MaxValue)
val probe = src.runWith(TestSink.probe[EventEnvelope])
probe.request(5)
probe.expectNext().timestamp should be > 0L
probe.expectNext().timestamp should be > 0L
probe.expectNext().timestamp should be > 0L
probe.expectComplete()
}
}
"Leveldb live query EventsByPersistenceId" must {
@ -179,6 +193,18 @@ class EventsByPersistenceIdSpec extends AkkaSpec(EventsByPersistenceIdSpec.confi
probe.expectNoMessage(100.millis).request(5).expectNext("e-3").expectNext("e-4")
}
"include timestamp in EventEnvelope" in {
setup("n")
val src = queries.eventsByPersistenceId("n", 0L, Long.MaxValue)
val probe = src.runWith(TestSink.probe[EventEnvelope])
probe.request(5)
probe.expectNext().timestamp should be > 0L
probe.expectNext().timestamp should be > 0L
probe.cancel()
}
}
}

View file

@ -157,6 +157,17 @@ class EventsByTagSpec extends AkkaSpec(EventsByTagSpec.config) with Cleanup with
.expectComplete()
}
"include timestamp in EventEnvelope" in {
system.actorOf(TestActor.props("testTimestamp"))
val greenSrc = queries.currentEventsByTag(tag = "green", offset = Sequence(0L))
val probe = greenSrc.runWith(TestSink.probe[EventEnvelope])
probe.request(2)
probe.expectNext().timestamp should be > 0L
probe.expectNext().timestamp should be > 0L
probe.cancel()
}
}
"Leveldb live query EventsByTag" must {