From e4b2af37832444acafe143e8d0ee72e2e91335b4 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Sun, 6 Apr 2014 21:33:31 +0200 Subject: [PATCH] =3974 per Persist (serialize) actor refs with transport info * The reason for the problem with NoSuchElementException in ClusterSharding was that actor references were not serialized with full address information. In certain fail over scenarios the references could not be resolved and therefore the ShardRegionTerminated did not match corresponding ShardRegionRegistered. * Wrap serialization with transport information from defaultAddress (cherry picked from commit 3e73ae5925cf1293a9a5d61e48919b1708e84df2) --- .../contrib/pattern/ClusterSharding.scala | 8 +++- .../serialization/MessageSerializer.scala | 26 ++++++++++--- .../serialization/SnapshotSerializer.scala | 38 +++++++++++++------ 3 files changed, 53 insertions(+), 19 deletions(-) diff --git a/akka-contrib/src/main/scala/akka/contrib/pattern/ClusterSharding.scala b/akka-contrib/src/main/scala/akka/contrib/pattern/ClusterSharding.scala index fcd17db043..c18dc64190 100644 --- a/akka-contrib/src/main/scala/akka/contrib/pattern/ClusterSharding.scala +++ b/akka-contrib/src/main/scala/akka/contrib/pattern/ClusterSharding.scala @@ -1198,7 +1198,13 @@ class ShardCoordinator(handOffTimeout: FiniteDuration, rebalanceInterval: Finite case ShardRegionProxyRegistered(proxy) ⇒ persistentState = persistentState.updated(evt) case ShardRegionTerminated(region) ⇒ - persistentState = persistentState.updated(evt) + if (persistentState.regions.contains(region)) + persistentState = persistentState.updated(evt) + else { + log.debug("ShardRegionTerminated, but region {} was not registered. This inconsistency is due to that " + + " some stored ActorRef in Akka v2.3.0 and v2.3.1 did not contain full address information. It will be " + + "removed by later watch.", region) + } case ShardRegionProxyTerminated(proxy) ⇒ persistentState = persistentState.updated(evt) case _: ShardHomeAllocated ⇒ diff --git a/akka-persistence/src/main/scala/akka/persistence/serialization/MessageSerializer.scala b/akka-persistence/src/main/scala/akka/persistence/serialization/MessageSerializer.scala index 1e32a0c8b3..a2be6b30d5 100644 --- a/akka-persistence/src/main/scala/akka/persistence/serialization/MessageSerializer.scala +++ b/akka-persistence/src/main/scala/akka/persistence/serialization/MessageSerializer.scala @@ -36,6 +36,12 @@ class MessageSerializer(val system: ExtendedActorSystem) extends Serializer { def identifier: Int = 7 def includeManifest: Boolean = true + private lazy val transportInformation: Option[Serialization.Information] = { + val address = system.provider.getDefaultAddress + if (address.hasLocalScope) None + else Some(Serialization.Information(address, system)) + } + /** * Serializes [[PersistentBatch]], [[PersistentRepr]] and [[Deliver]] messages. Delegates * serialization of a persistent message's payload to a matching `akka.serialization.Serializer`. @@ -103,14 +109,22 @@ class MessageSerializer(val system: ExtendedActorSystem) extends Serializer { } private def persistentPayloadBuilder(payload: AnyRef) = { - val serializer = SerializationExtension(system).findSerializerFor(payload) - val builder = PersistentPayload.newBuilder() + def payloadBuilder() = { + val serializer = SerializationExtension(system).findSerializerFor(payload) + val builder = PersistentPayload.newBuilder() - if (serializer.includeManifest) builder.setPayloadManifest((ByteString.copyFromUtf8(payload.getClass.getName))) + if (serializer.includeManifest) builder.setPayloadManifest((ByteString.copyFromUtf8(payload.getClass.getName))) - builder.setPayload(ByteString.copyFrom(serializer.toBinary(payload))) - builder.setSerializerId(serializer.identifier) - builder + builder.setPayload(ByteString.copyFrom(serializer.toBinary(payload))) + builder.setSerializerId(serializer.identifier) + builder + } + + // serialize actor references with full address information (defaultAddress) + transportInformation match { + case Some(ti) ⇒ Serialization.currentTransportInformation.withValue(ti) { payloadBuilder() } + case None ⇒ payloadBuilder() + } } private def deliveredMessageBuilder(delivered: Delivered) = { diff --git a/akka-persistence/src/main/scala/akka/persistence/serialization/SnapshotSerializer.scala b/akka-persistence/src/main/scala/akka/persistence/serialization/SnapshotSerializer.scala index f563dd3ce5..d2ffb0637b 100644 --- a/akka-persistence/src/main/scala/akka/persistence/serialization/SnapshotSerializer.scala +++ b/akka-persistence/src/main/scala/akka/persistence/serialization/SnapshotSerializer.scala @@ -6,9 +6,9 @@ package akka.persistence.serialization import java.io._ - import akka.actor._ import akka.serialization.{ Serializer, SerializationExtension } +import akka.serialization.Serialization /** * Wrapper for snapshot `data`. Snapshot `data` are the actual snapshot objects captured by @@ -32,6 +32,12 @@ class SnapshotSerializer(system: ExtendedActorSystem) extends Serializer { def identifier: Int = 8 def includeManifest: Boolean = false + private lazy val transportInformation: Option[Serialization.Information] = { + val address = system.provider.getDefaultAddress + if (address.hasLocalScope) None + else Some(Serialization.Information(address, system)) + } + /** * Serializes a [[Snapshot]]. Delegates serialization of snapshot `data` to a matching * `akka.serialization.Serializer`. @@ -49,22 +55,30 @@ class SnapshotSerializer(system: ExtendedActorSystem) extends Serializer { Snapshot(snapshotFromBinary(bytes)) private def snapshotToBinary(snapshot: AnyRef): Array[Byte] = { - val extension = SerializationExtension(system) + def serialize() = { + val extension = SerializationExtension(system) - val snapshotSerializer = extension.findSerializerFor(snapshot) - val snapshotManifest = if (snapshotSerializer.includeManifest) Some(snapshot.getClass.getName) else None + val snapshotSerializer = extension.findSerializerFor(snapshot) + val snapshotManifest = if (snapshotSerializer.includeManifest) Some(snapshot.getClass.getName) else None - val header = SnapshotHeader(snapshotSerializer.identifier, snapshotManifest) - val headerSerializer = extension.findSerializerFor(header) - val headerBytes = headerSerializer.toBinary(header) + val header = SnapshotHeader(snapshotSerializer.identifier, snapshotManifest) + val headerSerializer = extension.findSerializerFor(header) + val headerBytes = headerSerializer.toBinary(header) - val out = new ByteArrayOutputStream + val out = new ByteArrayOutputStream - writeInt(out, headerBytes.length) + writeInt(out, headerBytes.length) - out.write(headerBytes) - out.write(snapshotSerializer.toBinary(snapshot)) - out.toByteArray + out.write(headerBytes) + out.write(snapshotSerializer.toBinary(snapshot)) + out.toByteArray + } + + // serialize actor references with full address information (defaultAddress) + transportInformation match { + case Some(ti) ⇒ Serialization.currentTransportInformation.withValue(ti) { serialize() } + case None ⇒ serialize() + } } private def snapshotFromBinary(bytes: Array[Byte]): AnyRef = {