diff --git a/akka-persistence-tck/src/main/scala/akka/persistence/journal/JournalSpec.scala b/akka-persistence-tck/src/main/scala/akka/persistence/journal/JournalSpec.scala index 48f33d658a..2394b3b160 100644 --- a/akka-persistence-tck/src/main/scala/akka/persistence/journal/JournalSpec.scala +++ b/akka-persistence-tck/src/main/scala/akka/persistence/journal/JournalSpec.scala @@ -46,7 +46,7 @@ abstract class JournalSpec(config: Config) extends PluginSpec(config) { extension.journalFor(null) def replayedMessage(snr: Long, deleted: Boolean = false, confirms: Seq[String] = Nil): ReplayedMessage = - ReplayedMessage(PersistentImpl(s"a-${snr}", snr, pid, deleted, senderProbe.ref)) + ReplayedMessage(PersistentImpl(s"a-${snr}", snr, pid, deleted, Actor.noSender)) def writeMessages(from: Int, to: Int, pid: String, sender: ActorRef): Unit = { val msgs = from to to map { i ⇒ PersistentRepr(payload = s"a-${i}", sequenceNr = i, persistenceId = pid, sender = sender) } diff --git a/akka-persistence/src/main/java/akka/persistence/serialization/MessageFormats.java b/akka-persistence/src/main/java/akka/persistence/serialization/MessageFormats.java index 9cd5f561c9..bd7be6de1b 100644 --- a/akka-persistence/src/main/java/akka/persistence/serialization/MessageFormats.java +++ b/akka-persistence/src/main/java/akka/persistence/serialization/MessageFormats.java @@ -69,7 +69,7 @@ public final class MessageFormats { * repeated string confirms = 7; // Removed in 2.4 * optional bool confirmable = 8; // Removed in 2.4 * optional DeliveredMessage confirmMessage = 9; // Removed in 2.4 - * optional string confirmTarget = 10; + * optional string confirmTarget = 10; // Removed in 2.4 * */ boolean hasSender(); @@ -81,7 +81,7 @@ public final class MessageFormats { * repeated string confirms = 7; // Removed in 2.4 * optional bool confirmable = 8; // Removed in 2.4 * optional DeliveredMessage confirmMessage = 9; // Removed in 2.4 - * optional string confirmTarget = 10; + * optional string confirmTarget = 10; // Removed in 2.4 * */ java.lang.String getSender(); @@ -93,7 +93,7 @@ public final class MessageFormats { * repeated string confirms = 7; // Removed in 2.4 * optional bool confirmable = 8; // Removed in 2.4 * optional DeliveredMessage confirmMessage = 9; // Removed in 2.4 - * optional string confirmTarget = 10; + * optional string confirmTarget = 10; // Removed in 2.4 * */ com.google.protobuf.ByteString @@ -331,7 +331,7 @@ public final class MessageFormats { * repeated string confirms = 7; // Removed in 2.4 * optional bool confirmable = 8; // Removed in 2.4 * optional DeliveredMessage confirmMessage = 9; // Removed in 2.4 - * optional string confirmTarget = 10; + * optional string confirmTarget = 10; // Removed in 2.4 * */ public boolean hasSender() { @@ -345,7 +345,7 @@ public final class MessageFormats { * repeated string confirms = 7; // Removed in 2.4 * optional bool confirmable = 8; // Removed in 2.4 * optional DeliveredMessage confirmMessage = 9; // Removed in 2.4 - * optional string confirmTarget = 10; + * optional string confirmTarget = 10; // Removed in 2.4 * */ public java.lang.String getSender() { @@ -370,7 +370,7 @@ public final class MessageFormats { * repeated string confirms = 7; // Removed in 2.4 * optional bool confirmable = 8; // Removed in 2.4 * optional DeliveredMessage confirmMessage = 9; // Removed in 2.4 - * optional string confirmTarget = 10; + * optional string confirmTarget = 10; // Removed in 2.4 * */ public com.google.protobuf.ByteString @@ -974,7 +974,7 @@ public final class MessageFormats { * repeated string confirms = 7; // Removed in 2.4 * optional bool confirmable = 8; // Removed in 2.4 * optional DeliveredMessage confirmMessage = 9; // Removed in 2.4 - * optional string confirmTarget = 10; + * optional string confirmTarget = 10; // Removed in 2.4 * */ public boolean hasSender() { @@ -988,7 +988,7 @@ public final class MessageFormats { * repeated string confirms = 7; // Removed in 2.4 * optional bool confirmable = 8; // Removed in 2.4 * optional DeliveredMessage confirmMessage = 9; // Removed in 2.4 - * optional string confirmTarget = 10; + * optional string confirmTarget = 10; // Removed in 2.4 * */ public java.lang.String getSender() { @@ -1010,7 +1010,7 @@ public final class MessageFormats { * repeated string confirms = 7; // Removed in 2.4 * optional bool confirmable = 8; // Removed in 2.4 * optional DeliveredMessage confirmMessage = 9; // Removed in 2.4 - * optional string confirmTarget = 10; + * optional string confirmTarget = 10; // Removed in 2.4 * */ public com.google.protobuf.ByteString @@ -1034,7 +1034,7 @@ public final class MessageFormats { * repeated string confirms = 7; // Removed in 2.4 * optional bool confirmable = 8; // Removed in 2.4 * optional DeliveredMessage confirmMessage = 9; // Removed in 2.4 - * optional string confirmTarget = 10; + * optional string confirmTarget = 10; // Removed in 2.4 * */ public Builder setSender( @@ -1055,7 +1055,7 @@ public final class MessageFormats { * repeated string confirms = 7; // Removed in 2.4 * optional bool confirmable = 8; // Removed in 2.4 * optional DeliveredMessage confirmMessage = 9; // Removed in 2.4 - * optional string confirmTarget = 10; + * optional string confirmTarget = 10; // Removed in 2.4 * */ public Builder clearSender() { @@ -1072,7 +1072,7 @@ public final class MessageFormats { * repeated string confirms = 7; // Removed in 2.4 * optional bool confirmable = 8; // Removed in 2.4 * optional DeliveredMessage confirmMessage = 9; // Removed in 2.4 - * optional string confirmTarget = 10; + * optional string confirmTarget = 10; // Removed in 2.4 * */ public Builder setSenderBytes( diff --git a/akka-persistence/src/main/protobuf/MessageFormats.proto b/akka-persistence/src/main/protobuf/MessageFormats.proto index 5de75c419d..b286b56711 100644 --- a/akka-persistence/src/main/protobuf/MessageFormats.proto +++ b/akka-persistence/src/main/protobuf/MessageFormats.proto @@ -14,8 +14,8 @@ message PersistentMessage { // repeated string confirms = 7; // Removed in 2.4 // optional bool confirmable = 8; // Removed in 2.4 // optional DeliveredMessage confirmMessage = 9; // Removed in 2.4 - // optional string confirmTarget = 10; - optional string sender = 11; + // optional string confirmTarget = 10; // Removed in 2.4 + optional string sender = 11; // not stored in journal, needed for remote serialization } message PersistentPayload { diff --git a/akka-persistence/src/main/scala/akka/persistence/Persistent.scala b/akka-persistence/src/main/scala/akka/persistence/Persistent.scala index 28b9b3b3a8..cf8adacf97 100644 --- a/akka-persistence/src/main/scala/akka/persistence/Persistent.scala +++ b/akka-persistence/src/main/scala/akka/persistence/Persistent.scala @@ -72,17 +72,6 @@ trait PersistentRepr extends PersistentEnvelope with Message { */ def sender: ActorRef - /** - * INTERNAL API. - */ - private[persistence] def prepareWrite(sender: ActorRef): PersistentRepr - - /** - * INTERNAL API. - */ - private[persistence] def prepareWrite()(implicit context: ActorContext): PersistentRepr = - prepareWrite(if (sender.isInstanceOf[PromiseActorRef]) context.system.deadLetters else sender) - /** * Creates a new copy of this [[PersistentRepr]]. */ @@ -135,9 +124,6 @@ private[persistence] final case class PersistentImpl( def withPayload(payload: Any): PersistentRepr = copy(payload = payload) - def prepareWrite(sender: ActorRef) = - copy(sender = sender) - def update( sequenceNr: Long, persistenceId: String, diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteJournal.scala b/akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteJournal.scala index 4dbfb7917e..532bfdadfb 100644 --- a/akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteJournal.scala +++ b/akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteJournal.scala @@ -47,7 +47,7 @@ trait AsyncWriteJournal extends Actor with WriteJournalBase with AsyncRecovery { // Send replayed messages and replay result to persistentActor directly. No need // to resequence replayed messages relative to written and looped messages. asyncReplayMessages(persistenceId, fromSequenceNr, toSequenceNr, max) { p ⇒ - if (!p.deleted || replayDeleted) persistentActor.tell(ReplayedMessage(p), p.sender) + if (!p.deleted || replayDeleted) persistentActor.tell(ReplayedMessage(p), Actor.noSender) } map { case _ ⇒ ReplayMessagesSuccess } recover { diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/WriteJournalBase.scala b/akka-persistence/src/main/scala/akka/persistence/journal/WriteJournalBase.scala index dfe847df62..0265dcfbfc 100644 --- a/akka-persistence/src/main/scala/akka/persistence/journal/WriteJournalBase.scala +++ b/akka-persistence/src/main/scala/akka/persistence/journal/WriteJournalBase.scala @@ -12,13 +12,8 @@ private[akka] trait WriteJournalBase { this: Actor ⇒ protected def preparePersistentBatch(rb: immutable.Seq[PersistentEnvelope]): immutable.Seq[PersistentRepr] = - rb.filter(persistentPrepareWrite).asInstanceOf[immutable.Seq[PersistentRepr]] // filter instead of flatMap to avoid Some allocations - - private def persistentPrepareWrite(r: PersistentEnvelope): Boolean = r match { - case p: PersistentRepr ⇒ - p.prepareWrite(); true - case _ ⇒ - false - } + rb.collect { + case p: PersistentRepr ⇒ p.update(sender = Actor.noSender) // don't store sender + } } diff --git a/akka-persistence/src/main/scala/akka/persistence/serialization/MessageSerializer.scala b/akka-persistence/src/main/scala/akka/persistence/serialization/MessageSerializer.scala index 169db11cfc..f785ed9fe1 100644 --- a/akka-persistence/src/main/scala/akka/persistence/serialization/MessageSerializer.scala +++ b/akka-persistence/src/main/scala/akka/persistence/serialization/MessageSerializer.scala @@ -17,6 +17,7 @@ import akka.persistence.AtLeastOnceDelivery.{ AtLeastOnceDeliverySnapshot ⇒ At import akka.persistence.AtLeastOnceDelivery.UnconfirmedDelivery import scala.collection.immutable.VectorBuilder import akka.persistence.fsm.PersistentFsmActor.StateChangeEvent +import akka.actor.Actor /** * Marker trait for all protobuf-serializable messages in `akka.persistence`. @@ -119,7 +120,7 @@ class MessageSerializer(val system: ExtendedActorSystem) extends BaseSerializer val builder = PersistentMessage.newBuilder if (persistent.persistenceId != Undefined) builder.setPersistenceId(persistent.persistenceId) - if (persistent.sender != null) builder.setSender(Serialization.serializedActorPath(persistent.sender)) + if (persistent.sender != Actor.noSender) builder.setSender(Serialization.serializedActorPath(persistent.sender)) builder.setPayload(persistentPayloadBuilder(persistent.payload.asInstanceOf[AnyRef])) builder.setSequenceNr(persistent.sequenceNr) @@ -164,7 +165,7 @@ class MessageSerializer(val system: ExtendedActorSystem) extends BaseSerializer persistentMessage.getSequenceNr, if (persistentMessage.hasPersistenceId) persistentMessage.getPersistenceId else Undefined, persistentMessage.getDeleted, - if (persistentMessage.hasSender) system.provider.resolveActorRef(persistentMessage.getSender) else null) + if (persistentMessage.hasSender) system.provider.resolveActorRef(persistentMessage.getSender) else Actor.noSender) } private def payload(persistentPayload: PersistentPayload): Any = { diff --git a/akka-persistence/src/test/scala/akka/persistence/serialization/SerializerSpec.scala b/akka-persistence/src/test/scala/akka/persistence/serialization/SerializerSpec.scala index 36c42869fa..b0ef23e013 100644 --- a/akka-persistence/src/test/scala/akka/persistence/serialization/SerializerSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/serialization/SerializerSpec.scala @@ -148,7 +148,7 @@ class MessageSerializerPersistenceSpec extends AkkaSpec(customSerializers) { "A message serializer" when { "not given a manifest" must { "handle custom Persistent message serialization" in { - val persistent = PersistentRepr(MyPayload("a"), 13, "p1", true, testActor) + val persistent = PersistentRepr(MyPayload("a"), 13, "p1", true) val serializer = serialization.findSerializerFor(persistent) val bytes = serializer.toBinary(persistent) @@ -160,7 +160,7 @@ class MessageSerializerPersistenceSpec extends AkkaSpec(customSerializers) { "given a PersistentRepr manifest" must { "handle custom Persistent message serialization" in { - val persistent = PersistentRepr(MyPayload("b"), 13, "p1", true, testActor) + val persistent = PersistentRepr(MyPayload("b"), 13, "p1", true) val serializer = serialization.findSerializerFor(persistent) val bytes = serializer.toBinary(persistent) @@ -172,7 +172,7 @@ class MessageSerializerPersistenceSpec extends AkkaSpec(customSerializers) { "given payload serializer with string manifest" must { "handle serialization" in { - val persistent = PersistentRepr(MyPayload2("a", 17), 13, "p1", true, testActor) + val persistent = PersistentRepr(MyPayload2("a", 17), 13, "p1", true) val serializer = serialization.findSerializerFor(persistent) val bytes = serializer.toBinary(persistent) @@ -195,7 +195,7 @@ class MessageSerializerPersistenceSpec extends AkkaSpec(customSerializers) { } "be able to deserialize data when class is removed" in { - val serializer = serialization.findSerializerFor(PersistentRepr("x", 13, "p1", true, testActor)) + val serializer = serialization.findSerializerFor(PersistentRepr("x", 13, "p1", true)) // It was created with: // val old = PersistentRepr(OldPayload('A'), 13, "p1", true, testActor) @@ -221,6 +221,31 @@ class MessageSerializerPersistenceSpec extends AkkaSpec(customSerializers) { } } + "given PersistentRepr serialized with Akka 2.3.11 Scala 2.10" must { + "be able to deserialize with latest version" in { + // It was created with: + // val old = PersistentRepr(MyPayload("a"), 13, "p1", true, 3, List("c1", "c2"), confirmable = true, DeliveredByChannel("p2", "c2", 14), testActor, testActor) + // import org.apache.commons.codec.binary.Hex._ + // println(s"encoded persistent: " + String.valueOf(encodeHex(serializer.toBinary(persistent)))) + val oldData = + "0a3208c3da0412022e611a28616b6b612e70657273697374656e63652e73657269616c697a" + + "6174696f6e2e4d795061796c6f6164100d1a027031200130033a0263313a02633240014a0c" + + "0a02703212026332180e20005244616b6b613a2f2f4d65737361676553657269616c697a65" + + "7250657273697374656e6365537065632f73797374656d2f746573744163746f7232232d34" + + "34373233313933375a44616b6b613a2f2f4d65737361676553657269616c697a6572506572" + + "73697374656e6365537065632f73797374656d2f746573744163746f7232232d3434373233" + + "31393337" + + val bytes = decodeHex(oldData.toCharArray) + val expected = PersistentRepr(MyPayload(".a."), 13, "p1", true, Actor.noSender) + val serializer = serialization.findSerializerFor(expected) + val deserialized = serializer.fromBinary(bytes, None).asInstanceOf[PersistentRepr] + deserialized.sender should not be (null) + val deserializedWithoutSender = deserialized.update(sender = Actor.noSender) + deserializedWithoutSender should be(expected) + } + } + "given AtLeastOnceDeliverySnapshot" must { "handle empty unconfirmed" in { val unconfirmed = Vector.empty @@ -254,13 +279,13 @@ class MessageSerializerPersistenceSpec extends AkkaSpec(customSerializers) { object MessageSerializerRemotingSpec { class LocalActor(port: Int) extends Actor { def receive = { - case m ⇒ context.actorSelection(s"akka.tcp://remote@127.0.0.1:${port}/user/remote") tell (m, sender()) + case m ⇒ context.actorSelection(s"akka.tcp://remote@127.0.0.1:${port}/user/remote").tell(m, Actor.noSender) } } class RemoteActor extends Actor { def receive = { - case PersistentRepr(MyPayload(data), _) ⇒ sender() ! s"p${data}" + case p @ PersistentRepr(MyPayload(data), _) ⇒ p.sender ! s"p${data}" } } @@ -271,7 +296,7 @@ object MessageSerializerRemotingSpec { system.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress } -class MessageSerializerRemotingSpec extends AkkaSpec(remote.withFallback(customSerializers)) with ImplicitSender with DefaultTimeout { +class MessageSerializerRemotingSpec extends AkkaSpec(remote.withFallback(customSerializers)) with DefaultTimeout { import MessageSerializerRemotingSpec._ val remoteSystem = ActorSystem("remote", remote.withFallback(customSerializers)) @@ -287,7 +312,10 @@ class MessageSerializerRemotingSpec extends AkkaSpec(remote.withFallback(customS "A message serializer" must { "custom-serialize Persistent messages during remoting" in { - localActor ! PersistentRepr(MyPayload("a")) + // this also verifies serialization of PersistentRepr.sender, + // because the RemoteActor will reply to the PersistentRepr.sender + // is kept intact + localActor ! PersistentRepr(MyPayload("a"), sender = testActor) expectMsg("p.a.") } }