diff --git a/akka-persistence-tck/src/main/scala/akka/persistence/PluginSpec.scala b/akka-persistence-tck/src/main/scala/akka/persistence/PluginSpec.scala
index 7dc0a34d80..9a13971417 100644
--- a/akka-persistence-tck/src/main/scala/akka/persistence/PluginSpec.scala
+++ b/akka-persistence-tck/src/main/scala/akka/persistence/PluginSpec.scala
@@ -1,28 +1,28 @@
package akka.persistence
import java.util.concurrent.atomic.AtomicInteger
-
import scala.reflect.ClassTag
-
import akka.actor._
import akka.testkit._
-
import com.typesafe.config._
-
import org.scalatest._
+import java.util.UUID
abstract class PluginSpec(val config: Config) extends TestKitBase with WordSpecLike with Matchers with BeforeAndAfterAll with BeforeAndAfterEach {
private val counter = new AtomicInteger(0)
private var _extension: Persistence = _
private var _pid: String = _
+ private var _writerUuid: String = _
// used to avoid messages be delivered to a restarted actor,
// this is akka-persistence internals and journals themselves don't really care
protected val actorInstanceId = 1
- override protected def beforeEach(): Unit =
+ override protected def beforeEach(): Unit = {
_pid = s"p-${counter.incrementAndGet()}"
+ _writerUuid = UUID.randomUUID.toString
+ }
override protected def beforeAll(): Unit =
_extension = Persistence(system)
@@ -30,11 +30,11 @@ abstract class PluginSpec(val config: Config) extends TestKitBase with WordSpecL
override protected def afterAll(): Unit =
shutdown(system)
- def extension: Persistence =
- _extension
+ def extension: Persistence = _extension
- def pid: String =
- _pid
+ def pid: String = _pid
+
+ def writerUuid: String = _writerUuid
def subscribe[T: ClassTag](subscriber: ActorRef) =
system.eventStream.subscribe(subscriber, implicitly[ClassTag[T]].runtimeClass)
diff --git a/akka-persistence-tck/src/main/scala/akka/persistence/journal/JournalSpec.scala b/akka-persistence-tck/src/main/scala/akka/persistence/journal/JournalSpec.scala
index 39a649b408..ba287ccece 100644
--- a/akka-persistence-tck/src/main/scala/akka/persistence/journal/JournalSpec.scala
+++ b/akka-persistence-tck/src/main/scala/akka/persistence/journal/JournalSpec.scala
@@ -39,7 +39,7 @@ abstract class JournalSpec(config: Config) extends PluginSpec(config) {
super.beforeEach()
senderProbe = TestProbe()
receiverProbe = TestProbe()
- writeMessages(1, 5, pid, senderProbe.ref)
+ writeMessages(1, 5, pid, senderProbe.ref, writerUuid)
}
/**
@@ -52,22 +52,26 @@ abstract class JournalSpec(config: Config) extends PluginSpec(config) {
extension.journalFor(null)
def replayedMessage(snr: Long, deleted: Boolean = false, confirms: Seq[String] = Nil): ReplayedMessage =
- ReplayedMessage(PersistentImpl(s"a-${snr}", snr, pid, "", deleted, Actor.noSender))
+ ReplayedMessage(PersistentImpl(s"a-${snr}", snr, pid, "", deleted, Actor.noSender, writerUuid))
- def writeMessages(fromSnr: Int, toSnr: Int, pid: String, sender: ActorRef): Unit = {
+ def writeMessages(fromSnr: Int, toSnr: Int, pid: String, sender: ActorRef, writerUuid: String): Unit = {
val msgs =
if (supportsAtomicPersistAllOfSeveralEvents)
(fromSnr to toSnr).map { i ⇒
- AtomicWrite(PersistentRepr(payload = s"a-$i", sequenceNr = i, persistenceId = pid, sender = sender))
+ AtomicWrite(PersistentRepr(payload = s"a-$i", sequenceNr = i, persistenceId = pid, sender = sender,
+ writerUuid = writerUuid))
}
else
(fromSnr to toSnr - 1).map { i ⇒
if (i == toSnr - 1)
AtomicWrite(List(
- PersistentRepr(payload = s"a-$i", sequenceNr = i, persistenceId = pid, sender = sender),
- PersistentRepr(payload = s"a-${i + 1}", sequenceNr = i + 1, persistenceId = pid, sender = sender)))
+ PersistentRepr(payload = s"a-$i", sequenceNr = i, persistenceId = pid, sender = sender,
+ writerUuid = writerUuid),
+ PersistentRepr(payload = s"a-${i + 1}", sequenceNr = i + 1, persistenceId = pid, sender = sender,
+ writerUuid = writerUuid)))
else
- AtomicWrite(PersistentRepr(payload = s"a-${i}", sequenceNr = i, persistenceId = pid, sender = sender))
+ AtomicWrite(PersistentRepr(payload = s"a-${i}", sequenceNr = i, persistenceId = pid, sender = sender,
+ writerUuid = writerUuid))
}
val probe = TestProbe()
@@ -76,7 +80,10 @@ abstract class JournalSpec(config: Config) extends PluginSpec(config) {
probe.expectMsg(WriteMessagesSuccessful)
fromSnr to toSnr foreach { i ⇒
- probe.expectMsgPF() { case WriteMessageSuccess(PersistentImpl(payload, `i`, `pid`, _, _, `sender`), _) ⇒ payload should be(s"a-${i}") }
+ probe.expectMsgPF() {
+ case WriteMessageSuccess(PersistentImpl(payload, `i`, `pid`, _, _, `sender`, `writerUuid`), _) ⇒
+ payload should be(s"a-${i}")
+ }
}
}
@@ -158,7 +165,8 @@ abstract class JournalSpec(config: Config) extends PluginSpec(config) {
val notSerializableEvent = new Object { override def toString = "not serializable" }
val msgs = (6 to 8).map { i ⇒
val event = if (i == 7) notSerializableEvent else s"b-$i"
- AtomicWrite(PersistentRepr(payload = event, sequenceNr = i, persistenceId = pid, sender = Actor.noSender))
+ AtomicWrite(PersistentRepr(payload = event, sequenceNr = i, persistenceId = pid, sender = Actor.noSender,
+ writerUuid = writerUuid))
}
val probe = TestProbe()
@@ -166,15 +174,16 @@ abstract class JournalSpec(config: Config) extends PluginSpec(config) {
probe.expectMsg(WriteMessagesSuccessful)
val Pid = pid
+ val WriterUuid = writerUuid
probe.expectMsgPF() {
- case WriteMessageSuccess(PersistentImpl(payload, 6L, Pid, _, _, Actor.noSender), _) ⇒ payload should be(s"b-6")
+ case WriteMessageSuccess(PersistentImpl(payload, 6L, Pid, _, _, Actor.noSender, WriterUuid), _) ⇒ payload should be(s"b-6")
}
probe.expectMsgPF() {
- case WriteMessageRejected(PersistentImpl(payload, 7L, Pid, _, _, Actor.noSender), _, _) ⇒
+ case WriteMessageRejected(PersistentImpl(payload, 7L, Pid, _, _, Actor.noSender, WriterUuid), _, _) ⇒
payload should be(notSerializableEvent)
}
probe.expectMsgPF() {
- case WriteMessageSuccess(PersistentImpl(payload, 8L, Pid, _, _, Actor.noSender), _) ⇒ payload should be(s"b-8")
+ case WriteMessageSuccess(PersistentImpl(payload, 8L, Pid, _, _, Actor.noSender, WriterUuid), _) ⇒ payload should be(s"b-8")
}
}
diff --git a/akka-persistence/src/main/java/akka/persistence/serialization/MessageFormats.java b/akka-persistence/src/main/java/akka/persistence/serialization/MessageFormats.java
index b5d7500aa3..4c7d2985a7 100644
--- a/akka-persistence/src/main/java/akka/persistence/serialization/MessageFormats.java
+++ b/akka-persistence/src/main/java/akka/persistence/serialization/MessageFormats.java
@@ -113,6 +113,21 @@ public final class MessageFormats {
*/
com.google.protobuf.ByteString
getManifestBytes();
+
+ // optional string writerUuid = 13;
+ /**
+ * optional string writerUuid = 13;
+ */
+ boolean hasWriterUuid();
+ /**
+ * optional string writerUuid = 13;
+ */
+ java.lang.String getWriterUuid();
+ /**
+ * optional string writerUuid = 13;
+ */
+ com.google.protobuf.ByteString
+ getWriterUuidBytes();
}
/**
* Protobuf type {@code PersistentMessage}
@@ -203,6 +218,11 @@ public final class MessageFormats {
manifest_ = input.readBytes();
break;
}
+ case 106: {
+ bitField0_ |= 0x00000040;
+ writerUuid_ = input.readBytes();
+ break;
+ }
}
}
} catch (com.google.protobuf.InvalidProtocolBufferException e) {
@@ -450,6 +470,49 @@ public final class MessageFormats {
}
}
+ // optional string writerUuid = 13;
+ public static final int WRITERUUID_FIELD_NUMBER = 13;
+ private java.lang.Object writerUuid_;
+ /**
+ * optional string writerUuid = 13;
+ */
+ public boolean hasWriterUuid() {
+ return ((bitField0_ & 0x00000040) == 0x00000040);
+ }
+ /**
+ * optional string writerUuid = 13;
+ */
+ public java.lang.String getWriterUuid() {
+ java.lang.Object ref = writerUuid_;
+ if (ref instanceof java.lang.String) {
+ return (java.lang.String) ref;
+ } else {
+ com.google.protobuf.ByteString bs =
+ (com.google.protobuf.ByteString) ref;
+ java.lang.String s = bs.toStringUtf8();
+ if (bs.isValidUtf8()) {
+ writerUuid_ = s;
+ }
+ return s;
+ }
+ }
+ /**
+ * optional string writerUuid = 13;
+ */
+ public com.google.protobuf.ByteString
+ getWriterUuidBytes() {
+ java.lang.Object ref = writerUuid_;
+ if (ref instanceof java.lang.String) {
+ com.google.protobuf.ByteString b =
+ com.google.protobuf.ByteString.copyFromUtf8(
+ (java.lang.String) ref);
+ writerUuid_ = b;
+ return b;
+ } else {
+ return (com.google.protobuf.ByteString) ref;
+ }
+ }
+
private void initFields() {
payload_ = akka.persistence.serialization.MessageFormats.PersistentPayload.getDefaultInstance();
sequenceNr_ = 0L;
@@ -457,6 +520,7 @@ public final class MessageFormats {
deleted_ = false;
sender_ = "";
manifest_ = "";
+ writerUuid_ = "";
}
private byte memoizedIsInitialized = -1;
public final boolean isInitialized() {
@@ -494,6 +558,9 @@ public final class MessageFormats {
if (((bitField0_ & 0x00000020) == 0x00000020)) {
output.writeBytes(12, getManifestBytes());
}
+ if (((bitField0_ & 0x00000040) == 0x00000040)) {
+ output.writeBytes(13, getWriterUuidBytes());
+ }
getUnknownFields().writeTo(output);
}
@@ -527,6 +594,10 @@ public final class MessageFormats {
size += com.google.protobuf.CodedOutputStream
.computeBytesSize(12, getManifestBytes());
}
+ if (((bitField0_ & 0x00000040) == 0x00000040)) {
+ size += com.google.protobuf.CodedOutputStream
+ .computeBytesSize(13, getWriterUuidBytes());
+ }
size += getUnknownFields().getSerializedSize();
memoizedSerializedSize = size;
return size;
@@ -660,6 +731,8 @@ public final class MessageFormats {
bitField0_ = (bitField0_ & ~0x00000010);
manifest_ = "";
bitField0_ = (bitField0_ & ~0x00000020);
+ writerUuid_ = "";
+ bitField0_ = (bitField0_ & ~0x00000040);
return this;
}
@@ -716,6 +789,10 @@ public final class MessageFormats {
to_bitField0_ |= 0x00000020;
}
result.manifest_ = manifest_;
+ if (((from_bitField0_ & 0x00000040) == 0x00000040)) {
+ to_bitField0_ |= 0x00000040;
+ }
+ result.writerUuid_ = writerUuid_;
result.bitField0_ = to_bitField0_;
onBuilt();
return result;
@@ -756,6 +833,11 @@ public final class MessageFormats {
manifest_ = other.manifest_;
onChanged();
}
+ if (other.hasWriterUuid()) {
+ bitField0_ |= 0x00000040;
+ writerUuid_ = other.writerUuid_;
+ onChanged();
+ }
this.mergeUnknownFields(other.getUnknownFields());
return this;
}
@@ -1242,6 +1324,80 @@ public final class MessageFormats {
return this;
}
+ // optional string writerUuid = 13;
+ private java.lang.Object writerUuid_ = "";
+ /**
+ * optional string writerUuid = 13;
+ */
+ public boolean hasWriterUuid() {
+ return ((bitField0_ & 0x00000040) == 0x00000040);
+ }
+ /**
+ * optional string writerUuid = 13;
+ */
+ public java.lang.String getWriterUuid() {
+ java.lang.Object ref = writerUuid_;
+ if (!(ref instanceof java.lang.String)) {
+ java.lang.String s = ((com.google.protobuf.ByteString) ref)
+ .toStringUtf8();
+ writerUuid_ = s;
+ return s;
+ } else {
+ return (java.lang.String) ref;
+ }
+ }
+ /**
+ * optional string writerUuid = 13;
+ */
+ public com.google.protobuf.ByteString
+ getWriterUuidBytes() {
+ java.lang.Object ref = writerUuid_;
+ if (ref instanceof String) {
+ com.google.protobuf.ByteString b =
+ com.google.protobuf.ByteString.copyFromUtf8(
+ (java.lang.String) ref);
+ writerUuid_ = b;
+ return b;
+ } else {
+ return (com.google.protobuf.ByteString) ref;
+ }
+ }
+ /**
+ * optional string writerUuid = 13;
+ */
+ public Builder setWriterUuid(
+ java.lang.String value) {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+ bitField0_ |= 0x00000040;
+ writerUuid_ = value;
+ onChanged();
+ return this;
+ }
+ /**
+ * optional string writerUuid = 13;
+ */
+ public Builder clearWriterUuid() {
+ bitField0_ = (bitField0_ & ~0x00000040);
+ writerUuid_ = getDefaultInstance().getWriterUuid();
+ onChanged();
+ return this;
+ }
+ /**
+ * optional string writerUuid = 13;
+ */
+ public Builder setWriterUuidBytes(
+ com.google.protobuf.ByteString value) {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+ bitField0_ |= 0x00000040;
+ writerUuid_ = value;
+ onChanged();
+ return this;
+ }
+
// @@protoc_insertion_point(builder_scope:PersistentMessage)
}
@@ -4747,23 +4903,24 @@ public final class MessageFormats {
descriptor;
static {
java.lang.String[] descriptorData = {
- "\n\024MessageFormats.proto\"\226\001\n\021PersistentMes" +
+ "\n\024MessageFormats.proto\"\252\001\n\021PersistentMes" +
"sage\022#\n\007payload\030\001 \001(\0132\022.PersistentPayloa" +
"d\022\022\n\nsequenceNr\030\002 \001(\003\022\025\n\rpersistenceId\030\003" +
" \001(\t\022\017\n\007deleted\030\004 \001(\010\022\016\n\006sender\030\013 \001(\t\022\020\n" +
- "\010manifest\030\014 \001(\t\"S\n\021PersistentPayload\022\024\n\014" +
- "serializerId\030\001 \002(\005\022\017\n\007payload\030\002 \002(\014\022\027\n\017p" +
- "ayloadManifest\030\003 \001(\014\"2\n\013AtomicWrite\022#\n\007p" +
- "ayload\030\001 \003(\0132\022.PersistentMessage\"\356\001\n\033AtL" +
- "eastOnceDeliverySnapshot\022\031\n\021currentDeliv" +
- "eryId\030\001 \002(\003\022O\n\025unconfirmedDeliveries\030\002 \003",
- "(\01320.AtLeastOnceDeliverySnapshot.Unconfi" +
- "rmedDelivery\032c\n\023UnconfirmedDelivery\022\022\n\nd" +
- "eliveryId\030\001 \002(\003\022\023\n\013destination\030\002 \002(\t\022#\n\007" +
- "payload\030\003 \002(\0132\022.PersistentPayload\"F\n\032Per" +
- "sistentStateChangeEvent\022\027\n\017stateIdentifi" +
- "er\030\001 \002(\t\022\017\n\007timeout\030\002 \001(\tB\"\n\036akka.persis" +
- "tence.serializationH\001"
+ "\010manifest\030\014 \001(\t\022\022\n\nwriterUuid\030\r \001(\t\"S\n\021P" +
+ "ersistentPayload\022\024\n\014serializerId\030\001 \002(\005\022\017" +
+ "\n\007payload\030\002 \002(\014\022\027\n\017payloadManifest\030\003 \001(\014" +
+ "\"2\n\013AtomicWrite\022#\n\007payload\030\001 \003(\0132\022.Persi" +
+ "stentMessage\"\356\001\n\033AtLeastOnceDeliverySnap" +
+ "shot\022\031\n\021currentDeliveryId\030\001 \002(\003\022O\n\025uncon",
+ "firmedDeliveries\030\002 \003(\01320.AtLeastOnceDeli" +
+ "verySnapshot.UnconfirmedDelivery\032c\n\023Unco" +
+ "nfirmedDelivery\022\022\n\ndeliveryId\030\001 \002(\003\022\023\n\013d" +
+ "estination\030\002 \002(\t\022#\n\007payload\030\003 \002(\0132\022.Pers" +
+ "istentPayload\"F\n\032PersistentStateChangeEv" +
+ "ent\022\027\n\017stateIdentifier\030\001 \002(\t\022\017\n\007timeout\030" +
+ "\002 \001(\tB\"\n\036akka.persistence.serializationH" +
+ "\001"
};
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -4775,7 +4932,7 @@ public final class MessageFormats {
internal_static_PersistentMessage_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_PersistentMessage_descriptor,
- new java.lang.String[] { "Payload", "SequenceNr", "PersistenceId", "Deleted", "Sender", "Manifest", });
+ new java.lang.String[] { "Payload", "SequenceNr", "PersistenceId", "Deleted", "Sender", "Manifest", "WriterUuid", });
internal_static_PersistentPayload_descriptor =
getDescriptor().getMessageTypes().get(1);
internal_static_PersistentPayload_fieldAccessorTable = new
diff --git a/akka-persistence/src/main/protobuf/MessageFormats.proto b/akka-persistence/src/main/protobuf/MessageFormats.proto
index 34e4e0f577..be4d03ba19 100644
--- a/akka-persistence/src/main/protobuf/MessageFormats.proto
+++ b/akka-persistence/src/main/protobuf/MessageFormats.proto
@@ -17,6 +17,7 @@ message PersistentMessage {
// optional string confirmTarget = 10; // Removed in 2.4
optional string sender = 11; // not stored in journal, needed for remote serialization
optional string manifest = 12;
+ optional string writerUuid = 13;
}
message PersistentPayload {
diff --git a/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala b/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala
index 65c07079b6..91d873ffda 100644
--- a/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala
+++ b/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala
@@ -12,6 +12,7 @@ import akka.actor.StashFactory
import akka.event.Logging
import akka.event.LoggingAdapter
import akka.actor.ActorRef
+import java.util.UUID
/**
* INTERNAL API
@@ -47,6 +48,7 @@ private[persistence] trait Eventsourced extends Snapshotter with Stash with Stas
private[persistence] lazy val snapshotStore = extension.snapshotStoreFor(snapshotPluginId)
private val instanceId: Int = Eventsourced.instanceIdCounter.getAndIncrement()
+ private val writerUuid = UUID.randomUUID.toString
private var journalBatch = Vector.empty[PersistentEnvelope]
private val maxMessageBatchSize = extension.settings.journal.maxMessageBatchSize
@@ -615,7 +617,7 @@ private[persistence] trait Eventsourced extends Snapshotter with Stash with Stas
private def addToBatch(p: PersistentEnvelope): Unit = p match {
case a: AtomicWrite ⇒
journalBatch :+= a.copy(payload =
- a.payload.map(_.update(persistenceId = persistenceId, sequenceNr = nextSequenceNr())))
+ a.payload.map(_.update(persistenceId = persistenceId, sequenceNr = nextSequenceNr(), writerUuid = writerUuid)))
case r: PersistentEnvelope ⇒
journalBatch :+= r
}
diff --git a/akka-persistence/src/main/scala/akka/persistence/Persistent.scala b/akka-persistence/src/main/scala/akka/persistence/Persistent.scala
index 29919c155f..900c4d296d 100644
--- a/akka-persistence/src/main/scala/akka/persistence/Persistent.scala
+++ b/akka-persistence/src/main/scala/akka/persistence/Persistent.scala
@@ -71,6 +71,13 @@ trait PersistentRepr extends Message {
*/
def sequenceNr: Long
+ /**
+ * Unique identifier of the writing persistent actor.
+ * Used to detect anomalies with overlapping writes from multiple
+ * persistent actors, which can result in inconsistent replays.
+ */
+ def writerUuid: String
+
/**
* Creates a new persistent message with the specified `payload`.
*/
@@ -100,7 +107,8 @@ trait PersistentRepr extends Message {
sequenceNr: Long = sequenceNr,
persistenceId: String = persistenceId,
deleted: Boolean = deleted,
- sender: ActorRef = sender): PersistentRepr
+ sender: ActorRef = sender,
+ writerUuid: String = writerUuid): PersistentRepr
}
object PersistentRepr {
@@ -118,8 +126,9 @@ object PersistentRepr {
persistenceId: String = PersistentRepr.Undefined,
manifest: String = PersistentRepr.Undefined,
deleted: Boolean = false,
- sender: ActorRef = null): PersistentRepr =
- PersistentImpl(payload, sequenceNr, persistenceId, manifest, deleted, sender)
+ sender: ActorRef = null,
+ writerUuid: String = PersistentRepr.Undefined): PersistentRepr =
+ PersistentImpl(payload, sequenceNr, persistenceId, manifest, deleted, sender, writerUuid)
/**
* Java API, Plugin API.
@@ -142,7 +151,8 @@ private[persistence] final case class PersistentImpl(
override val persistenceId: String,
override val manifest: String,
override val deleted: Boolean,
- override val sender: ActorRef) extends PersistentRepr {
+ override val sender: ActorRef,
+ override val writerUuid: String) extends PersistentRepr {
def withPayload(payload: Any): PersistentRepr =
copy(payload = payload)
@@ -151,8 +161,13 @@ private[persistence] final case class PersistentImpl(
if (this.manifest == manifest) this
else copy(manifest = manifest)
- def update(sequenceNr: Long, persistenceId: String, deleted: Boolean, sender: ActorRef) =
- copy(sequenceNr = sequenceNr, persistenceId = persistenceId, deleted = deleted, sender = sender)
+ def update(sequenceNr: Long, persistenceId: String, deleted: Boolean, sender: ActorRef, writerUuid: String) =
+ copy(
+ sequenceNr = sequenceNr,
+ persistenceId = persistenceId,
+ deleted = deleted,
+ sender = sender,
+ writerUuid = writerUuid)
}
diff --git a/akka-persistence/src/main/scala/akka/persistence/serialization/MessageSerializer.scala b/akka-persistence/src/main/scala/akka/persistence/serialization/MessageSerializer.scala
index 63a2238ae3..9f15010f1c 100644
--- a/akka-persistence/src/main/scala/akka/persistence/serialization/MessageSerializer.scala
+++ b/akka-persistence/src/main/scala/akka/persistence/serialization/MessageSerializer.scala
@@ -134,6 +134,7 @@ class MessageSerializer(val system: ExtendedActorSystem) extends BaseSerializer
builder.setPayload(persistentPayloadBuilder(persistent.payload.asInstanceOf[AnyRef]))
builder.setSequenceNr(persistent.sequenceNr)
// deleted is not used in new records from 2.4
+ if (persistent.writerUuid != Undefined) builder.setWriterUuid(persistent.writerUuid)
builder
}
@@ -175,7 +176,8 @@ class MessageSerializer(val system: ExtendedActorSystem) extends BaseSerializer
if (persistentMessage.hasPersistenceId) persistentMessage.getPersistenceId else Undefined,
if (persistentMessage.hasManifest) persistentMessage.getManifest else Undefined,
if (persistentMessage.hasDeleted) persistentMessage.getDeleted else false,
- if (persistentMessage.hasSender) system.provider.resolveActorRef(persistentMessage.getSender) else Actor.noSender)
+ if (persistentMessage.hasSender) system.provider.resolveActorRef(persistentMessage.getSender) else Actor.noSender,
+ if (persistentMessage.hasWriterUuid) persistentMessage.getWriterUuid else Undefined)
}
private def atomicWrite(atomicWrite: mf.AtomicWrite): AtomicWrite = {
diff --git a/akka-persistence/src/test/scala/akka/persistence/PersistentActorFailureSpec.scala b/akka-persistence/src/test/scala/akka/persistence/PersistentActorFailureSpec.scala
index 7acaf56876..f377201f55 100644
--- a/akka-persistence/src/test/scala/akka/persistence/PersistentActorFailureSpec.scala
+++ b/akka-persistence/src/test/scala/akka/persistence/PersistentActorFailureSpec.scala
@@ -9,7 +9,7 @@ import akka.actor.{ OneForOneStrategy, _ }
import akka.persistence.journal.AsyncWriteProxy
import akka.persistence.journal.AsyncWriteTarget.{ ReplayFailure, ReplayMessages, ReplaySuccess, WriteMessages }
import akka.persistence.journal.inmem.InmemStore
-import akka.testkit.{ ImplicitSender, TestProbe }
+import akka.testkit.{ ImplicitSender, TestProbe, TestEvent, EventFilter }
import akka.util.Timeout
import scala.concurrent.duration._
@@ -147,6 +147,9 @@ class PersistentActorFailureSpec extends PersistenceSpec(PersistenceSpec.config(
import PersistentActorFailureSpec._
import PersistentActorSpec._
+ system.eventStream.publish(TestEvent.Mute(
+ EventFilter[akka.pattern.AskTimeoutException]()))
+
def prepareFailingRecovery(): Unit = {
val persistentActor = namedPersistentActor[FailingRecovery]
persistentActor ! Cmd("a")
diff --git a/akka-persistence/src/test/scala/akka/persistence/serialization/SerializerSpec.scala b/akka-persistence/src/test/scala/akka/persistence/serialization/SerializerSpec.scala
index 237878c3de..58bd94c7ba 100644
--- a/akka-persistence/src/test/scala/akka/persistence/serialization/SerializerSpec.scala
+++ b/akka-persistence/src/test/scala/akka/persistence/serialization/SerializerSpec.scala
@@ -4,6 +4,7 @@
package akka.persistence.serialization
+import java.util.UUID
import akka.actor._
import akka.persistence.AtLeastOnceDelivery.{ AtLeastOnceDeliverySnapshot, UnconfirmedDelivery }
import akka.persistence._
@@ -145,7 +146,7 @@ class MessageSerializerPersistenceSpec extends AkkaSpec(customSerializers) {
"A message serializer" when {
"not given a manifest" must {
"handle custom Persistent message serialization" in {
- val persistent = PersistentRepr(MyPayload("a"), 13, "p1", "")
+ val persistent = PersistentRepr(MyPayload("a"), 13, "p1", "", writerUuid = UUID.randomUUID().toString)
val serializer = serialization.findSerializerFor(persistent)
val bytes = serializer.toBinary(persistent)
@@ -157,7 +158,7 @@ class MessageSerializerPersistenceSpec extends AkkaSpec(customSerializers) {
"given a PersistentRepr manifest" must {
"handle custom Persistent message serialization" in {
- val persistent = PersistentRepr(MyPayload("b"), 13, "p1", "")
+ val persistent = PersistentRepr(MyPayload("b"), 13, "p1", "", writerUuid = UUID.randomUUID().toString)
val serializer = serialization.findSerializerFor(persistent)
val bytes = serializer.toBinary(persistent)
@@ -169,7 +170,7 @@ class MessageSerializerPersistenceSpec extends AkkaSpec(customSerializers) {
"given payload serializer with string manifest" must {
"handle serialization" in {
- val persistent = PersistentRepr(MyPayload2("a", 17), 13, "p1", "")
+ val persistent = PersistentRepr(MyPayload2("a", 17), 13, "p1", "", writerUuid = UUID.randomUUID().toString)
val serializer = serialization.findSerializerFor(persistent)
val bytes = serializer.toBinary(persistent)