diff --git a/akka-contrib/src/main/scala/akka/contrib/pattern/ClusterSharding.scala b/akka-contrib/src/main/scala/akka/contrib/pattern/ClusterSharding.scala index fcd17db043..c18dc64190 100644 --- a/akka-contrib/src/main/scala/akka/contrib/pattern/ClusterSharding.scala +++ b/akka-contrib/src/main/scala/akka/contrib/pattern/ClusterSharding.scala @@ -1198,7 +1198,13 @@ class ShardCoordinator(handOffTimeout: FiniteDuration, rebalanceInterval: Finite case ShardRegionProxyRegistered(proxy) ⇒ persistentState = persistentState.updated(evt) case ShardRegionTerminated(region) ⇒ - persistentState = persistentState.updated(evt) + if (persistentState.regions.contains(region)) + persistentState = persistentState.updated(evt) + else { + log.debug("ShardRegionTerminated, but region {} was not registered. This inconsistency is due to that " + + " some stored ActorRef in Akka v2.3.0 and v2.3.1 did not contain full address information. It will be " + + "removed by later watch.", region) + } case ShardRegionProxyTerminated(proxy) ⇒ persistentState = persistentState.updated(evt) case _: ShardHomeAllocated ⇒ diff --git a/akka-persistence/src/main/scala/akka/persistence/serialization/MessageSerializer.scala b/akka-persistence/src/main/scala/akka/persistence/serialization/MessageSerializer.scala index 1e32a0c8b3..a2be6b30d5 100644 --- a/akka-persistence/src/main/scala/akka/persistence/serialization/MessageSerializer.scala +++ b/akka-persistence/src/main/scala/akka/persistence/serialization/MessageSerializer.scala @@ -36,6 +36,12 @@ class MessageSerializer(val system: ExtendedActorSystem) extends Serializer { def identifier: Int = 7 def includeManifest: Boolean = true + private lazy val transportInformation: Option[Serialization.Information] = { + val address = system.provider.getDefaultAddress + if (address.hasLocalScope) None + else Some(Serialization.Information(address, system)) + } + /** * Serializes [[PersistentBatch]], [[PersistentRepr]] and [[Deliver]] messages. Delegates * serialization of a persistent message's payload to a matching `akka.serialization.Serializer`. @@ -103,14 +109,22 @@ class MessageSerializer(val system: ExtendedActorSystem) extends Serializer { } private def persistentPayloadBuilder(payload: AnyRef) = { - val serializer = SerializationExtension(system).findSerializerFor(payload) - val builder = PersistentPayload.newBuilder() + def payloadBuilder() = { + val serializer = SerializationExtension(system).findSerializerFor(payload) + val builder = PersistentPayload.newBuilder() - if (serializer.includeManifest) builder.setPayloadManifest((ByteString.copyFromUtf8(payload.getClass.getName))) + if (serializer.includeManifest) builder.setPayloadManifest((ByteString.copyFromUtf8(payload.getClass.getName))) - builder.setPayload(ByteString.copyFrom(serializer.toBinary(payload))) - builder.setSerializerId(serializer.identifier) - builder + builder.setPayload(ByteString.copyFrom(serializer.toBinary(payload))) + builder.setSerializerId(serializer.identifier) + builder + } + + // serialize actor references with full address information (defaultAddress) + transportInformation match { + case Some(ti) ⇒ Serialization.currentTransportInformation.withValue(ti) { payloadBuilder() } + case None ⇒ payloadBuilder() + } } private def deliveredMessageBuilder(delivered: Delivered) = { diff --git a/akka-persistence/src/main/scala/akka/persistence/serialization/SnapshotSerializer.scala b/akka-persistence/src/main/scala/akka/persistence/serialization/SnapshotSerializer.scala index f563dd3ce5..d2ffb0637b 100644 --- a/akka-persistence/src/main/scala/akka/persistence/serialization/SnapshotSerializer.scala +++ b/akka-persistence/src/main/scala/akka/persistence/serialization/SnapshotSerializer.scala @@ -6,9 +6,9 @@ package akka.persistence.serialization import java.io._ - import akka.actor._ import akka.serialization.{ Serializer, SerializationExtension } +import akka.serialization.Serialization /** * Wrapper for snapshot `data`. Snapshot `data` are the actual snapshot objects captured by @@ -32,6 +32,12 @@ class SnapshotSerializer(system: ExtendedActorSystem) extends Serializer { def identifier: Int = 8 def includeManifest: Boolean = false + private lazy val transportInformation: Option[Serialization.Information] = { + val address = system.provider.getDefaultAddress + if (address.hasLocalScope) None + else Some(Serialization.Information(address, system)) + } + /** * Serializes a [[Snapshot]]. Delegates serialization of snapshot `data` to a matching * `akka.serialization.Serializer`. @@ -49,22 +55,30 @@ class SnapshotSerializer(system: ExtendedActorSystem) extends Serializer { Snapshot(snapshotFromBinary(bytes)) private def snapshotToBinary(snapshot: AnyRef): Array[Byte] = { - val extension = SerializationExtension(system) + def serialize() = { + val extension = SerializationExtension(system) - val snapshotSerializer = extension.findSerializerFor(snapshot) - val snapshotManifest = if (snapshotSerializer.includeManifest) Some(snapshot.getClass.getName) else None + val snapshotSerializer = extension.findSerializerFor(snapshot) + val snapshotManifest = if (snapshotSerializer.includeManifest) Some(snapshot.getClass.getName) else None - val header = SnapshotHeader(snapshotSerializer.identifier, snapshotManifest) - val headerSerializer = extension.findSerializerFor(header) - val headerBytes = headerSerializer.toBinary(header) + val header = SnapshotHeader(snapshotSerializer.identifier, snapshotManifest) + val headerSerializer = extension.findSerializerFor(header) + val headerBytes = headerSerializer.toBinary(header) - val out = new ByteArrayOutputStream + val out = new ByteArrayOutputStream - writeInt(out, headerBytes.length) + writeInt(out, headerBytes.length) - out.write(headerBytes) - out.write(snapshotSerializer.toBinary(snapshot)) - out.toByteArray + out.write(headerBytes) + out.write(snapshotSerializer.toBinary(snapshot)) + out.toByteArray + } + + // serialize actor references with full address information (defaultAddress) + transportInformation match { + case Some(ti) ⇒ Serialization.currentTransportInformation.withValue(ti) { serialize() } + case None ⇒ serialize() + } } private def snapshotFromBinary(bytes: Array[Byte]): AnyRef = {