diff --git a/akka-persistence-query/src/main/scala/akka/persistence/query/EventEnvelope.scala b/akka-persistence-query/src/main/scala/akka/persistence/query/EventEnvelope.scala index d988a27e35..5e2af1f335 100644 --- a/akka-persistence-query/src/main/scala/akka/persistence/query/EventEnvelope.scala +++ b/akka-persistence-query/src/main/scala/akka/persistence/query/EventEnvelope.scala @@ -4,8 +4,76 @@ package akka.persistence.query +import scala.runtime.AbstractFunction4 + +import akka.util.HashCode + +// for binary compatibility (used to be a case class) +object EventEnvelope extends AbstractFunction4[Offset, String, Long, Any, EventEnvelope] { + def apply(offset: Offset, persistenceId: String, sequenceNr: Long, event: Any, timestamp: Long): EventEnvelope = + new EventEnvelope(offset, persistenceId, sequenceNr, event, timestamp) + + override def apply(offset: Offset, persistenceId: String, sequenceNr: Long, event: Any): EventEnvelope = + new EventEnvelope(offset, persistenceId, sequenceNr, event) + + def unapply(arg: EventEnvelope): Option[(Offset, String, Long, Any)] = + Some((arg.offset, arg.persistenceId, arg.sequenceNr, arg.timestamp)) + +} + /** * Event wrapper adding meta data for the events in the result stream of * [[akka.persistence.query.scaladsl.EventsByTagQuery]] query, or similar queries. + * + * The `timestamp` is the time the event was stored, in milliseconds since midnight, January 1, 1970 UTC + * (same as `System.currentTimeMillis`). */ -final case class EventEnvelope(offset: Offset, persistenceId: String, sequenceNr: Long, event: Any) +final class EventEnvelope( + val offset: Offset, + val persistenceId: String, + val sequenceNr: Long, + val event: Any, + val timestamp: Long) + extends Product4[Offset, String, Long, Any] + with Serializable { + + // for binary compatibility + def this(offset: Offset, persistenceId: String, sequenceNr: Long, event: Any) = + this(offset, persistenceId, sequenceNr, event, 0L) + + override def hashCode(): Int = { + var result = HashCode.SEED + result = HashCode.hash(result, offset) + result = HashCode.hash(result, persistenceId) + result = HashCode.hash(result, sequenceNr) + result = HashCode.hash(result, event) + result + } + + override def equals(obj: Any): Boolean = obj match { + case other: EventEnvelope => + offset == other.offset && persistenceId == other.persistenceId && sequenceNr == other.sequenceNr && + event == other.event // timestamp not included in equals for backwards compatibility + case _ => false + } + + override def toString: String = + s"EventEnvelope($offset,$persistenceId,$sequenceNr,$event,$timestamp)" + + // for binary compatibility (used to be a case class) + def copy( + offset: Offset = this.offset, + persistenceId: String = this.persistenceId, + sequenceNr: Long = this.sequenceNr, + event: Any = this.event): EventEnvelope = + new EventEnvelope(offset, persistenceId, sequenceNr, event, timestamp) + + // Product4, for binary compatibility (used to be a case class) + override def productPrefix = "EventEnvelope" + override def _1: Offset = offset + override def _2: String = persistenceId + override def _3: Long = sequenceNr + override def _4: Any = event + override def canEqual(that: Any): Boolean = that.isInstanceOf[EventEnvelope] + +} diff --git a/akka-persistence-query/src/main/scala/akka/persistence/query/journal/leveldb/EventsByPersistenceIdStage.scala b/akka-persistence-query/src/main/scala/akka/persistence/query/journal/leveldb/EventsByPersistenceIdStage.scala index 4e25fa46da..b71d160ce6 100644 --- a/akka-persistence-query/src/main/scala/akka/persistence/query/journal/leveldb/EventsByPersistenceIdStage.scala +++ b/akka-persistence-query/src/main/scala/akka/persistence/query/journal/leveldb/EventsByPersistenceIdStage.scala @@ -108,7 +108,8 @@ final private[akka] class EventsByPersistenceIdStage( offset = Sequence(pr.sequenceNr), persistenceId = pr.persistenceId, sequenceNr = pr.sequenceNr, - event = pr.payload)) + event = pr.payload, + timestamp = pr.timestamp)) nextSequenceNr = pr.sequenceNr + 1 deliverBuf(out) diff --git a/akka-persistence-query/src/main/scala/akka/persistence/query/journal/leveldb/EventsByTagStage.scala b/akka-persistence-query/src/main/scala/akka/persistence/query/journal/leveldb/EventsByTagStage.scala index 313fcd4970..d500b1cb8d 100644 --- a/akka-persistence-query/src/main/scala/akka/persistence/query/journal/leveldb/EventsByTagStage.scala +++ b/akka-persistence-query/src/main/scala/akka/persistence/query/journal/leveldb/EventsByTagStage.scala @@ -106,7 +106,8 @@ final private[leveldb] class EventsByTagStage( offset = Sequence(offset), persistenceId = p.persistenceId, sequenceNr = p.sequenceNr, - event = p.payload)) + event = p.payload, + timestamp = p.timestamp)) currOffset = offset deliverBuf(out) diff --git a/akka-persistence-query/src/test/scala/akka/persistence/query/journal/leveldb/EventsByPersistenceIdSpec.scala b/akka-persistence-query/src/test/scala/akka/persistence/query/journal/leveldb/EventsByPersistenceIdSpec.scala index 896afef2b4..50bc98fb61 100644 --- a/akka-persistence-query/src/test/scala/akka/persistence/query/journal/leveldb/EventsByPersistenceIdSpec.scala +++ b/akka-persistence-query/src/test/scala/akka/persistence/query/journal/leveldb/EventsByPersistenceIdSpec.scala @@ -11,9 +11,10 @@ import akka.persistence.query.scaladsl.EventsByTagQuery import akka.stream.testkit.scaladsl.TestSink import akka.testkit.AkkaSpec import akka.testkit.ImplicitSender - import scala.concurrent.duration._ +import akka.persistence.query.EventEnvelope + object EventsByPersistenceIdSpec { val config = """ akka.loglevel = INFO @@ -143,6 +144,19 @@ class EventsByPersistenceIdSpec extends AkkaSpec(EventsByPersistenceIdSpec.confi src.map(_.event).runWith(TestSink.probe[Any]).request(1).expectComplete() } + "include timestamp in EventEnvelope" in { + setup("m") + + val src = queries.currentEventsByPersistenceId("m", 0L, Long.MaxValue) + val probe = src.runWith(TestSink.probe[EventEnvelope]) + + probe.request(5) + probe.expectNext().timestamp should be > 0L + probe.expectNext().timestamp should be > 0L + probe.expectNext().timestamp should be > 0L + probe.expectComplete() + } + } "Leveldb live query EventsByPersistenceId" must { @@ -179,6 +193,18 @@ class EventsByPersistenceIdSpec extends AkkaSpec(EventsByPersistenceIdSpec.confi probe.expectNoMessage(100.millis).request(5).expectNext("e-3").expectNext("e-4") } + + "include timestamp in EventEnvelope" in { + setup("n") + + val src = queries.eventsByPersistenceId("n", 0L, Long.MaxValue) + val probe = src.runWith(TestSink.probe[EventEnvelope]) + + probe.request(5) + probe.expectNext().timestamp should be > 0L + probe.expectNext().timestamp should be > 0L + probe.cancel() + } } } diff --git a/akka-persistence-query/src/test/scala/akka/persistence/query/journal/leveldb/EventsByTagSpec.scala b/akka-persistence-query/src/test/scala/akka/persistence/query/journal/leveldb/EventsByTagSpec.scala index c037cefd59..db39ee66c4 100644 --- a/akka-persistence-query/src/test/scala/akka/persistence/query/journal/leveldb/EventsByTagSpec.scala +++ b/akka-persistence-query/src/test/scala/akka/persistence/query/journal/leveldb/EventsByTagSpec.scala @@ -157,6 +157,17 @@ class EventsByTagSpec extends AkkaSpec(EventsByTagSpec.config) with Cleanup with .expectComplete() } + + "include timestamp in EventEnvelope" in { + system.actorOf(TestActor.props("testTimestamp")) + val greenSrc = queries.currentEventsByTag(tag = "green", offset = Sequence(0L)) + val probe = greenSrc.runWith(TestSink.probe[EventEnvelope]) + + probe.request(2) + probe.expectNext().timestamp should be > 0L + probe.expectNext().timestamp should be > 0L + probe.cancel() + } } "Leveldb live query EventsByTag" must { diff --git a/akka-persistence-tck/src/main/scala/akka/persistence/journal/JournalSpec.scala b/akka-persistence-tck/src/main/scala/akka/persistence/journal/JournalSpec.scala index d20179007a..76d35d92fd 100644 --- a/akka-persistence-tck/src/main/scala/akka/persistence/journal/JournalSpec.scala +++ b/akka-persistence-tck/src/main/scala/akka/persistence/journal/JournalSpec.scala @@ -78,7 +78,7 @@ abstract class JournalSpec(config: Config) extension.journalFor(null) def replayedMessage(snr: Long, deleted: Boolean = false): ReplayedMessage = - ReplayedMessage(PersistentImpl(s"a-${snr}", snr, pid, "", deleted, Actor.noSender, writerUuid)) + ReplayedMessage(PersistentImpl(s"a-${snr}", snr, pid, "", deleted, Actor.noSender, writerUuid, 0L)) def writeMessages(fromSnr: Int, toSnr: Int, pid: String, sender: ActorRef, writerUuid: String): Unit = { @@ -111,7 +111,7 @@ abstract class JournalSpec(config: Config) probe.expectMsg(WriteMessagesSuccessful) (fromSnr to toSnr).foreach { i => probe.expectMsgPF() { - case WriteMessageSuccess(PersistentImpl(payload, `i`, `pid`, _, _, `sender`, `writerUuid`), _) => + case WriteMessageSuccess(PersistentImpl(payload, `i`, `pid`, _, _, `sender`, `writerUuid`, _), _) => payload should be(s"a-${i}") } } @@ -262,15 +262,15 @@ abstract class JournalSpec(config: Config) val Pid = pid val WriterUuid = writerUuid probe.expectMsgPF() { - case WriteMessageSuccess(PersistentImpl(payload, 6L, Pid, _, _, Actor.noSender, WriterUuid), _) => + case WriteMessageSuccess(PersistentImpl(payload, 6L, Pid, _, _, Actor.noSender, WriterUuid, _), _) => payload should be(s"b-6") } probe.expectMsgPF() { - case WriteMessageRejected(PersistentImpl(payload, 7L, Pid, _, _, Actor.noSender, WriterUuid), _, _) => + case WriteMessageRejected(PersistentImpl(payload, 7L, Pid, _, _, Actor.noSender, WriterUuid, _), _, _) => payload should be(notSerializableEvent) } probe.expectMsgPF() { - case WriteMessageSuccess(PersistentImpl(payload, 8L, Pid, _, _, Actor.noSender, WriterUuid), _) => + case WriteMessageSuccess(PersistentImpl(payload, 8L, Pid, _, _, Actor.noSender, WriterUuid, _), _) => payload should be(s"b-8") } } @@ -295,13 +295,13 @@ abstract class JournalSpec(config: Config) val Pid = pid val WriterUuid = writerUuid probe.expectMsgPF() { - case WriteMessageSuccess(PersistentImpl(payload, 6L, Pid, _, _, Actor.noSender, WriterUuid), _) => + case WriteMessageSuccess(PersistentImpl(payload, 6L, Pid, _, _, Actor.noSender, WriterUuid, _), _) => payload should be(event) } journal ! ReplayMessages(6, Long.MaxValue, Long.MaxValue, pid, receiverProbe.ref) receiverProbe.expectMsgPF() { - case ReplayedMessage(PersistentImpl(payload, 6L, Pid, _, _, Actor.noSender, WriterUuid)) => + case ReplayedMessage(PersistentImpl(payload, 6L, Pid, _, _, Actor.noSender, WriterUuid, _)) => payload should be(event) } receiverProbe.expectMsgPF() { diff --git a/akka-persistence/src/main/java/akka/persistence/serialization/MessageFormats.java b/akka-persistence/src/main/java/akka/persistence/serialization/MessageFormats.java index c7f36dd389..027ddb6b6f 100644 --- a/akka-persistence/src/main/java/akka/persistence/serialization/MessageFormats.java +++ b/akka-persistence/src/main/java/akka/persistence/serialization/MessageFormats.java @@ -158,6 +158,17 @@ public final class MessageFormats { */ akka.protobufv3.internal.ByteString getWriterUuidBytes(); + + /** + * optional sint64 timestamp = 14; + * @return Whether the timestamp field is set. + */ + boolean hasTimestamp(); + /** + * optional sint64 timestamp = 14; + * @return The timestamp. + */ + long getTimestamp(); } /** * Protobuf type {@code PersistentMessage} @@ -256,6 +267,11 @@ public final class MessageFormats { writerUuid_ = bs; break; } + case 112: { + bitField0_ |= 0x00000080; + timestamp_ = input.readSInt64(); + break; + } default: { if (!parseUnknownField( input, unknownFields, extensionRegistry, tag)) { @@ -558,6 +574,23 @@ public final class MessageFormats { } } + public static final int TIMESTAMP_FIELD_NUMBER = 14; + private long timestamp_; + /** + * optional sint64 timestamp = 14; + * @return Whether the timestamp field is set. + */ + public boolean hasTimestamp() { + return ((bitField0_ & 0x00000080) != 0); + } + /** + * optional sint64 timestamp = 14; + * @return The timestamp. + */ + public long getTimestamp() { + return timestamp_; + } + private byte memoizedIsInitialized = -1; @java.lang.Override public final boolean isInitialized() { @@ -599,6 +632,9 @@ public final class MessageFormats { if (((bitField0_ & 0x00000040) != 0)) { akka.protobufv3.internal.GeneratedMessageV3.writeString(output, 13, writerUuid_); } + if (((bitField0_ & 0x00000080) != 0)) { + output.writeSInt64(14, timestamp_); + } unknownFields.writeTo(output); } @@ -632,6 +668,10 @@ public final class MessageFormats { if (((bitField0_ & 0x00000040) != 0)) { size += akka.protobufv3.internal.GeneratedMessageV3.computeStringSize(13, writerUuid_); } + if (((bitField0_ & 0x00000080) != 0)) { + size += akka.protobufv3.internal.CodedOutputStream + .computeSInt64Size(14, timestamp_); + } size += unknownFields.getSerializedSize(); memoizedSize = size; return size; @@ -682,6 +722,11 @@ public final class MessageFormats { if (!getWriterUuid() .equals(other.getWriterUuid())) return false; } + if (hasTimestamp() != other.hasTimestamp()) return false; + if (hasTimestamp()) { + if (getTimestamp() + != other.getTimestamp()) return false; + } if (!unknownFields.equals(other.unknownFields)) return false; return true; } @@ -723,6 +768,11 @@ public final class MessageFormats { hash = (37 * hash) + WRITERUUID_FIELD_NUMBER; hash = (53 * hash) + getWriterUuid().hashCode(); } + if (hasTimestamp()) { + hash = (37 * hash) + TIMESTAMP_FIELD_NUMBER; + hash = (53 * hash) + akka.protobufv3.internal.Internal.hashLong( + getTimestamp()); + } hash = (29 * hash) + unknownFields.hashCode(); memoizedHashCode = hash; return hash; @@ -875,6 +925,8 @@ public final class MessageFormats { bitField0_ = (bitField0_ & ~0x00000020); writerUuid_ = ""; bitField0_ = (bitField0_ & ~0x00000040); + timestamp_ = 0L; + bitField0_ = (bitField0_ & ~0x00000080); return this; } @@ -935,6 +987,10 @@ public final class MessageFormats { to_bitField0_ |= 0x00000040; } result.writerUuid_ = writerUuid_; + if (((from_bitField0_ & 0x00000080) != 0)) { + result.timestamp_ = timestamp_; + to_bitField0_ |= 0x00000080; + } result.bitField0_ = to_bitField0_; onBuilt(); return result; @@ -1013,6 +1069,9 @@ public final class MessageFormats { writerUuid_ = other.writerUuid_; onChanged(); } + if (other.hasTimestamp()) { + setTimestamp(other.getTimestamp()); + } this.mergeUnknownFields(other.unknownFields); onChanged(); return this; @@ -1641,6 +1700,43 @@ public final class MessageFormats { onChanged(); return this; } + + private long timestamp_ ; + /** + * optional sint64 timestamp = 14; + * @return Whether the timestamp field is set. + */ + public boolean hasTimestamp() { + return ((bitField0_ & 0x00000080) != 0); + } + /** + * optional sint64 timestamp = 14; + * @return The timestamp. + */ + public long getTimestamp() { + return timestamp_; + } + /** + * optional sint64 timestamp = 14; + * @param value The timestamp to set. + * @return This builder for chaining. + */ + public Builder setTimestamp(long value) { + bitField0_ |= 0x00000080; + timestamp_ = value; + onChanged(); + return this; + } + /** + * optional sint64 timestamp = 14; + * @return This builder for chaining. + */ + public Builder clearTimestamp() { + bitField0_ = (bitField0_ & ~0x00000080); + timestamp_ = 0L; + onChanged(); + return this; + } @java.lang.Override public final Builder setUnknownFields( final akka.protobufv3.internal.UnknownFieldSet unknownFields) { @@ -6963,27 +7059,27 @@ public final class MessageFormats { descriptor; static { java.lang.String[] descriptorData = { - "\n\024MessageFormats.proto\"\252\001\n\021PersistentMes" + + "\n\024MessageFormats.proto\"\275\001\n\021PersistentMes" + "sage\022#\n\007payload\030\001 \001(\0132\022.PersistentPayloa" + "d\022\022\n\nsequenceNr\030\002 \001(\003\022\025\n\rpersistenceId\030\003" + " \001(\t\022\017\n\007deleted\030\004 \001(\010\022\016\n\006sender\030\013 \001(\t\022\020\n" + - "\010manifest\030\014 \001(\t\022\022\n\nwriterUuid\030\r \001(\t\"S\n\021P" + - "ersistentPayload\022\024\n\014serializerId\030\001 \002(\005\022\017" + - "\n\007payload\030\002 \002(\014\022\027\n\017payloadManifest\030\003 \001(\014" + - "\"2\n\013AtomicWrite\022#\n\007payload\030\001 \003(\0132\022.Persi" + - "stentMessage\"\356\001\n\033AtLeastOnceDeliverySnap" + - "shot\022\031\n\021currentDeliveryId\030\001 \002(\003\022O\n\025uncon" + - "firmedDeliveries\030\002 \003(\01320.AtLeastOnceDeli" + - "verySnapshot.UnconfirmedDelivery\032c\n\023Unco" + - "nfirmedDelivery\022\022\n\ndeliveryId\030\001 \002(\003\022\023\n\013d" + - "estination\030\002 \002(\t\022#\n\007payload\030\003 \002(\0132\022.Pers" + - "istentPayload\"\\\n\032PersistentStateChangeEv" + - "ent\022\027\n\017stateIdentifier\030\001 \002(\t\022\017\n\007timeout\030" + - "\002 \001(\t\022\024\n\014timeoutNanos\030\003 \001(\003\"h\n\025Persisten" + - "tFSMSnapshot\022\027\n\017stateIdentifier\030\001 \002(\t\022 \n" + - "\004data\030\002 \002(\0132\022.PersistentPayload\022\024\n\014timeo" + - "utNanos\030\003 \001(\003B\"\n\036akka.persistence.serial" + - "izationH\001" + "\010manifest\030\014 \001(\t\022\022\n\nwriterUuid\030\r \001(\t\022\021\n\tt" + + "imestamp\030\016 \001(\022\"S\n\021PersistentPayload\022\024\n\014s" + + "erializerId\030\001 \002(\005\022\017\n\007payload\030\002 \002(\014\022\027\n\017pa" + + "yloadManifest\030\003 \001(\014\"2\n\013AtomicWrite\022#\n\007pa" + + "yload\030\001 \003(\0132\022.PersistentMessage\"\356\001\n\033AtLe" + + "astOnceDeliverySnapshot\022\031\n\021currentDelive" + + "ryId\030\001 \002(\003\022O\n\025unconfirmedDeliveries\030\002 \003(" + + "\01320.AtLeastOnceDeliverySnapshot.Unconfir" + + "medDelivery\032c\n\023UnconfirmedDelivery\022\022\n\nde" + + "liveryId\030\001 \002(\003\022\023\n\013destination\030\002 \002(\t\022#\n\007p" + + "ayload\030\003 \002(\0132\022.PersistentPayload\"\\\n\032Pers" + + "istentStateChangeEvent\022\027\n\017stateIdentifie" + + "r\030\001 \002(\t\022\017\n\007timeout\030\002 \001(\t\022\024\n\014timeoutNanos" + + "\030\003 \001(\003\"h\n\025PersistentFSMSnapshot\022\027\n\017state" + + "Identifier\030\001 \002(\t\022 \n\004data\030\002 \002(\0132\022.Persist" + + "entPayload\022\024\n\014timeoutNanos\030\003 \001(\003B\"\n\036akka" + + ".persistence.serializationH\001" }; descriptor = akka.protobufv3.internal.Descriptors.FileDescriptor .internalBuildGeneratedFileFrom(descriptorData, @@ -6994,7 +7090,7 @@ public final class MessageFormats { internal_static_PersistentMessage_fieldAccessorTable = new akka.protobufv3.internal.GeneratedMessageV3.FieldAccessorTable( internal_static_PersistentMessage_descriptor, - new java.lang.String[] { "Payload", "SequenceNr", "PersistenceId", "Deleted", "Sender", "Manifest", "WriterUuid", }); + new java.lang.String[] { "Payload", "SequenceNr", "PersistenceId", "Deleted", "Sender", "Manifest", "WriterUuid", "Timestamp", }); internal_static_PersistentPayload_descriptor = getDescriptor().getMessageTypes().get(1); internal_static_PersistentPayload_fieldAccessorTable = new diff --git a/akka-persistence/src/main/mima-filters/2.6.1.backwards.excludes/28331-timestamp-PersistentRepr.excludes b/akka-persistence/src/main/mima-filters/2.6.1.backwards.excludes/28331-timestamp-PersistentRepr.excludes new file mode 100644 index 0000000000..3205e63430 --- /dev/null +++ b/akka-persistence/src/main/mima-filters/2.6.1.backwards.excludes/28331-timestamp-PersistentRepr.excludes @@ -0,0 +1,9 @@ +# #28331 Add optional timestamp to PersistentRepr +ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.persistence.PersistentRepr.timestamp") +ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.persistence.PersistentRepr.withTimestamp") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.persistence.PersistentImpl.apply") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.persistence.PersistentImpl.copy") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.persistence.PersistentImpl.this") +ProblemFilters.exclude[Problem]("akka.persistence.PersistentImpl*") +ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.persistence.serialization.MessageFormats#PersistentMessageOrBuilder.hasTimestamp") +ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.persistence.serialization.MessageFormats#PersistentMessageOrBuilder.getTimestamp") diff --git a/akka-persistence/src/main/protobuf/MessageFormats.proto b/akka-persistence/src/main/protobuf/MessageFormats.proto index b55d05e03d..e5954743f1 100644 --- a/akka-persistence/src/main/protobuf/MessageFormats.proto +++ b/akka-persistence/src/main/protobuf/MessageFormats.proto @@ -20,6 +20,7 @@ message PersistentMessage { optional string sender = 11; // not stored in journal, needed for remote serialization optional string manifest = 12; optional string writerUuid = 13; + optional sint64 timestamp = 14; } message PersistentPayload { diff --git a/akka-persistence/src/main/scala/akka/persistence/Persistent.scala b/akka-persistence/src/main/scala/akka/persistence/Persistent.scala index 60a88d68b4..98caaa1fc8 100644 --- a/akka-persistence/src/main/scala/akka/persistence/Persistent.scala +++ b/akka-persistence/src/main/scala/akka/persistence/Persistent.scala @@ -9,6 +9,7 @@ import akka.persistence.serialization.Message import scala.collection.immutable import akka.annotation.DoNotInherit +import akka.util.HashCode /** * INTERNAL API @@ -91,6 +92,16 @@ final case class AtomicWrite(payload: immutable.Seq[PersistentRepr]) extends Per */ def sequenceNr: Long + /** + * The `timestamp` is the time the event was stored, in milliseconds since midnight, January 1, 1970 UTC + * (same as `System.currentTimeMillis`). + * + * Value `0` is used if undefined. + */ + def timestamp: Long + + def withTimestamp(newTimestamp: Long): PersistentRepr + /** * Unique identifier of the writing persistent actor. * Used to detect anomalies with overlapping writes from multiple @@ -152,7 +163,7 @@ object PersistentRepr { deleted: Boolean = false, sender: ActorRef = null, writerUuid: String = PersistentRepr.Undefined): PersistentRepr = - PersistentImpl(payload, sequenceNr, persistenceId, manifest, deleted, sender, writerUuid) + PersistentImpl(payload, sequenceNr, persistenceId, manifest, deleted, sender, writerUuid, 0L) /** * Java API, Plugin API. @@ -176,7 +187,8 @@ private[persistence] final case class PersistentImpl( override val manifest: String, override val deleted: Boolean, override val sender: ActorRef, - override val writerUuid: String) + override val writerUuid: String, + override val timestamp: Long) extends PersistentRepr with NoSerializationVerificationNeeded { @@ -187,6 +199,10 @@ private[persistence] final case class PersistentImpl( if (this.manifest == manifest) this else copy(manifest = manifest) + override def withTimestamp(newTimestamp: Long): PersistentRepr = + if (this.timestamp == newTimestamp) this + else copy(timestamp = newTimestamp) + def update(sequenceNr: Long, persistenceId: String, deleted: Boolean, sender: ActorRef, writerUuid: String) = copy( sequenceNr = sequenceNr, @@ -195,4 +211,25 @@ private[persistence] final case class PersistentImpl( sender = sender, writerUuid = writerUuid) + override def hashCode(): Int = { + var result = HashCode.SEED + result = HashCode.hash(result, payload) + result = HashCode.hash(result, sequenceNr) + result = HashCode.hash(result, persistenceId) + result = HashCode.hash(result, manifest) + result = HashCode.hash(result, deleted) + result = HashCode.hash(result, sender) + result = HashCode.hash(result, writerUuid) + // timestamp not included in equals for backwards compatibility + result + } + + override def equals(obj: Any): Boolean = obj match { + case other: PersistentImpl => + payload == other.payload && sequenceNr == other.sequenceNr && persistenceId == other.persistenceId && + manifest == other.manifest && deleted == other.deleted && + sender == other.sender && writerUuid == other.writerUuid // timestamp not included in equals for backwards compatibility + case _ => false + } + } diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbStore.scala b/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbStore.scala index 201fafa29d..11caeee05c 100644 --- a/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbStore.scala +++ b/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbStore.scala @@ -166,7 +166,7 @@ private[persistence] trait LeveldbStore def persistentFromBytes(a: Array[Byte]): PersistentRepr = serialization.deserialize(a, classOf[PersistentRepr]).get private def addToMessageBatch(persistent: PersistentRepr, tags: Set[String], batch: WriteBatch): Unit = { - val persistentBytes = persistentToBytes(persistent) + val persistentBytes = persistentToBytes(persistent.withTimestamp(System.currentTimeMillis())) val nid = numericId(persistent.persistenceId) batch.put(keyToBytes(counterKey(nid)), counterToBytes(persistent.sequenceNr)) batch.put(keyToBytes(Key(nid, persistent.sequenceNr, 0)), persistentBytes) diff --git a/akka-persistence/src/main/scala/akka/persistence/serialization/MessageSerializer.scala b/akka-persistence/src/main/scala/akka/persistence/serialization/MessageSerializer.scala index 94ccca6d7b..f4ca47e722 100644 --- a/akka-persistence/src/main/scala/akka/persistence/serialization/MessageSerializer.scala +++ b/akka-persistence/src/main/scala/akka/persistence/serialization/MessageSerializer.scala @@ -168,6 +168,7 @@ class MessageSerializer(val system: ExtendedActorSystem) extends BaseSerializer builder.setSequenceNr(persistent.sequenceNr) // deleted is not used in new records from 2.4 if (persistent.writerUuid != Undefined) builder.setWriterUuid(persistent.writerUuid) + if (persistent.timestamp > 0L) builder.setTimestamp(persistent.timestamp) builder } @@ -197,7 +198,7 @@ class MessageSerializer(val system: ExtendedActorSystem) extends BaseSerializer // private def persistent(persistentMessage: mf.PersistentMessage): PersistentRepr = { - PersistentRepr( + val repr = PersistentRepr( payload(persistentMessage.getPayload), persistentMessage.getSequenceNr, if (persistentMessage.hasPersistenceId) persistentMessage.getPersistenceId else Undefined, @@ -206,6 +207,8 @@ class MessageSerializer(val system: ExtendedActorSystem) extends BaseSerializer if (persistentMessage.hasSender) system.provider.resolveActorRef(persistentMessage.getSender) else Actor.noSender, if (persistentMessage.hasWriterUuid) persistentMessage.getWriterUuid else Undefined) + + if (persistentMessage.hasTimestamp) repr.withTimestamp(persistentMessage.getTimestamp) else repr } private def atomicWrite(atomicWrite: mf.AtomicWrite): AtomicWrite = {