From e6633f17fac9b2fe1100af73b18add3ac24ad0df Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Mon, 21 May 2018 16:59:04 +0200 Subject: [PATCH] Make sure Serialization.currentTransportInformation is always set, #25067 * The ThreadLocal Serialization.currentTransportInformation is used for serializing local actor refs, but it's also useful when a serializer library e.g. custom serializer/deserializer in Jackson need access to the current ActorSystem. * We set this in a rather ad-hoc way from remoting and in some persistence plugins, but it's only set for serialization and not deserialization, and it's easy for Persistence plugins or other libraries to forget this when using Akka serialization directly. * This change is automatically setting the info when using the ordinary serialize and deserialize methods. * It's also set when LocalActorRefProvider, which wasn't always the case previously. * Keep a cached instance of Serialization.Information in the provider to avoid creating new instances all the time. * Added optional Persistence TCK tests to verify that the plugin is setting this if it's using some custom calls to the serializer. --- .../test/scala/akka/actor/ActorRefSpec.scala | 2 +- .../mima-filters/2.5.12.backwards.excludes | 5 +- .../src/main/scala/akka/actor/ActorRef.scala | 2 +- .../scala/akka/actor/ActorRefProvider.scala | 33 +++- .../main/scala/akka/actor/TypedActor.scala | 5 +- .../scala/akka/actor/dungeon/Children.scala | 16 +- .../scala/akka/actor/dungeon/Dispatch.scala | 15 +- .../akka/serialization/Serialization.scala | 161 ++++++++++++------ .../scala/akka/serialization/Serializer.scala | 4 +- .../ddata/protobuf/SerializationSupport.scala | 11 +- .../PersistencePluginDocSpec.scala | 9 +- .../akka/persistence/CapabilityFlags.scala | 12 +- .../akka/persistence/TestSerializer.scala | 50 ++++++ .../japi/journal/JavaJournalPerfSpec.scala | 2 + .../japi/journal/JavaJournalSpec.scala | 2 + .../japi/snapshot/JavaSnapshotStoreSpec.scala | 7 +- .../persistence/journal/JournalSpec.scala | 40 ++++- .../snapshot/SnapshotStoreSpec.scala | 38 ++++- .../leveldb/LeveldbJournalJavaSpec.scala | 2 + .../LeveldbJournalNativePerfSpec.scala | 2 + .../leveldb/LeveldbJournalNativeSpec.scala | 2 + ...nalNoAtomicPersistMultipleEventsSpec.scala | 2 + .../local/LocalSnapshotStoreSpec.scala | 7 +- .../serialization/MessageSerializer.scala | 17 +- .../serialization/SnapshotSerializer.scala | 11 +- .../snapshot/local/LocalSnapshotStore.scala | 5 +- .../RemoteRestartedQuarantinedSpec.scala | 1 - .../scala/akka/remote/MessageSerializer.scala | 27 ++- .../akka/remote/RemoteActorRefProvider.scala | 21 ++- .../akka/remote/artery/ArteryTransport.scala | 2 +- .../akka/remote/artery/Association.scala | 2 + .../scala/akka/remote/artery/Codecs.scala | 52 +++--- .../remote/artery/EnvelopeBufferPool.scala | 11 +- ...erializationTransportInformationSpec.scala | 10 ++ ...erializationTransportInformationSpec.scala | 135 +++++++++++++++ .../stream/impl/streamref/SourceRefImpl.scala | 2 +- 36 files changed, 579 insertions(+), 146 deletions(-) create mode 100644 akka-persistence-tck/src/main/scala/akka/persistence/TestSerializer.scala create mode 100644 akka-remote/src/test/scala/akka/remote/artery/SerializationTransportInformationSpec.scala create mode 100644 akka-remote/src/test/scala/akka/remote/serialization/SerializationTransportInformationSpec.scala diff --git a/akka-actor-tests/src/test/scala/akka/actor/ActorRefSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/ActorRefSpec.scala index befa20f00d..ff702530b8 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/ActorRefSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/ActorRefSpec.scala @@ -313,7 +313,7 @@ class ActorRefSpec extends AkkaSpec with DefaultTimeout { (intercept[java.lang.IllegalStateException] { in.readObject }).getMessage should ===("Trying to deserialize a serialized ActorRef without an ActorSystem in scope." + - " Use 'akka.serialization.Serialization.currentSystem.withValue(system) { ... }'") + " Use 'akka.serialization.JavaSerializer.currentSystem.withValue(system) { ... }'") } "return EmptyLocalActorRef on deserialize if not present in actor hierarchy (and remoting is not enabled)" in { diff --git a/akka-actor/src/main/mima-filters/2.5.12.backwards.excludes b/akka-actor/src/main/mima-filters/2.5.12.backwards.excludes index 0d0fd144f2..7cd4c8ad44 100644 --- a/akka-actor/src/main/mima-filters/2.5.12.backwards.excludes +++ b/akka-actor/src/main/mima-filters/2.5.12.backwards.excludes @@ -1,3 +1,6 @@ +# #25067 Serialization.Information +ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.actor.ActorRefProvider.serializationInformation") + # #24646 java.time.Duration ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.actor.AbstractActor#ActorContext.cancelReceiveTimeout") -ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.actor.AbstractActor#ActorContext.setReceiveTimeout") \ No newline at end of file +ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.actor.AbstractActor#ActorContext.setReceiveTimeout") diff --git a/akka-actor/src/main/scala/akka/actor/ActorRef.scala b/akka-actor/src/main/scala/akka/actor/ActorRef.scala index 035faa3c6b..a869f5366c 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRef.scala @@ -422,7 +422,7 @@ private[akka] final case class SerializedActorRef private (path: String) { case null ⇒ throw new IllegalStateException( "Trying to deserialize a serialized ActorRef without an ActorSystem in scope." + - " Use 'akka.serialization.Serialization.currentSystem.withValue(system) { ... }'") + " Use 'akka.serialization.JavaSerializer.currentSystem.withValue(system) { ... }'") case someSystem ⇒ someSystem.provider.resolveActorRef(path) } diff --git a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala index 8e93e6f2ae..62361ea02e 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala @@ -5,23 +5,30 @@ package akka.actor import akka.dispatch.sysmsg._ -import akka.dispatch.{ UnboundedMessageQueueSemantics, RequiresMessageQueue } +import akka.dispatch.{ RequiresMessageQueue, UnboundedMessageQueueSemantics } import akka.routing._ import akka.event._ -import akka.util.{ Helpers } +import akka.util.Helpers import akka.japi.Util.immutableSeq import akka.util.Collections.EmptyImmutableSeq import scala.util.control.NonFatal import java.util.concurrent.atomic.AtomicLong + import scala.concurrent.{ ExecutionContextExecutor, Future, Promise } import scala.annotation.implicitNotFound + import akka.ConfigurationException +import akka.annotation.DoNotInherit +import akka.annotation.InternalApi import akka.dispatch.Mailboxes +import akka.serialization.Serialization +import akka.util.OptionVal /** * Interface for all ActorRef providers to implement. + * Not intended for extension outside of Akka. */ -trait ActorRefProvider { +@DoNotInherit trait ActorRefProvider { /** * Reference to the supervisor of guardian and systemGuardian; this is @@ -179,6 +186,9 @@ trait ActorRefProvider { * Obtain the external address of the default transport. */ def getDefaultAddress: Address + + /** INTERNAL API */ + @InternalApi private[akka] def serializationInformation: Serialization.Information } /** @@ -795,4 +805,21 @@ private[akka] class LocalActorRefProvider private[akka] ( def getExternalAddressFor(addr: Address): Option[Address] = if (addr == rootPath.address) Some(addr) else None def getDefaultAddress: Address = rootPath.address + + // no need for volatile, only intended as cached value, not necessarily a singleton value + private var serializationInformationCache: OptionVal[Serialization.Information] = OptionVal.None + @InternalApi override private[akka] def serializationInformation: Serialization.Information = { + Serialization.Information(getDefaultAddress, system) + serializationInformationCache match { + case OptionVal.Some(info) ⇒ info + case OptionVal.None ⇒ + if (system eq null) + throw new IllegalStateException("Too early access of serializationInformation") + else { + val info = Serialization.Information(rootPath.address, system) + serializationInformationCache = OptionVal.Some(info) + info + } + } + } } diff --git a/akka-actor/src/main/scala/akka/actor/TypedActor.scala b/akka-actor/src/main/scala/akka/actor/TypedActor.scala index 24933c9a9a..3dafdf70aa 100644 --- a/akka-actor/src/main/scala/akka/actor/TypedActor.scala +++ b/akka-actor/src/main/scala/akka/actor/TypedActor.scala @@ -177,7 +177,7 @@ object TypedActor extends ExtensionId[TypedActorExtension] with ExtensionIdProvi val system = akka.serialization.JavaSerializer.currentSystem.value if (system eq null) throw new IllegalStateException( "Trying to deserialize a SerializedMethodCall without an ActorSystem in scope." + - " Use akka.serialization.Serialization.currentSystem.withValue(system) { ... }") + " Use akka.serialization.JavaSerializer.currentSystem.withValue(system) { ... }") val serialization = SerializationExtension(system) MethodCall(ownerType.getDeclaredMethod(methodName, parameterTypes: _*), serializedParameters match { case null ⇒ null @@ -443,7 +443,8 @@ object TypedActor extends ExtensionId[TypedActorExtension] with ExtensionIdProvi */ private[akka] final case class SerializedTypedActorInvocationHandler(val actor: ActorRef, val timeout: FiniteDuration) { @throws(classOf[ObjectStreamException]) private def readResolve(): AnyRef = JavaSerializer.currentSystem.value match { - case null ⇒ throw new IllegalStateException("SerializedTypedActorInvocationHandler.readResolve requires that JavaSerializer.currentSystem.value is set to a non-null value") + case null ⇒ throw new IllegalStateException("SerializedTypedActorInvocationHandler.readResolve requires that " + + "JavaSerializer.currentSystem.value is set to a non-null value") case some ⇒ toTypedActorInvocationHandler(some) } diff --git a/akka-actor/src/main/scala/akka/actor/dungeon/Children.scala b/akka-actor/src/main/scala/akka/actor/dungeon/Children.scala index 1f22a70f67..edc45ff73f 100644 --- a/akka-actor/src/main/scala/akka/actor/dungeon/Children.scala +++ b/akka-actor/src/main/scala/akka/actor/dungeon/Children.scala @@ -7,8 +7,9 @@ package akka.actor.dungeon import scala.annotation.tailrec import scala.util.control.NonFatal import scala.collection.immutable + import akka.actor._ -import akka.serialization.{ SerializationExtension, Serializers } +import akka.serialization.{ Serialization, SerializationExtension, Serializers } import akka.util.{ Helpers, Unsafe } import java.util.Optional @@ -241,13 +242,16 @@ private[akka] trait Children { this: ActorCell ⇒ } private def makeChild(cell: ActorCell, props: Props, name: String, async: Boolean, systemService: Boolean): ActorRef = { - if (cell.system.settings.SerializeAllCreators && !systemService && props.deploy.scope != LocalScope) + if (cell.system.settings.SerializeAllCreators && !systemService && props.deploy.scope != LocalScope) { + val oldInfo = Serialization.currentTransportInformation.value try { val ser = SerializationExtension(cell.system) + if (oldInfo eq null) + Serialization.currentTransportInformation.value = system.provider.serializationInformation + props.args forall (arg ⇒ arg == null || - arg.isInstanceOf[NoSerializationVerificationNeeded] || - { + arg.isInstanceOf[NoSerializationVerificationNeeded] || { val o = arg.asInstanceOf[AnyRef] val serializer = ser.findSerializerFor(o) val bytes = serializer.toBinary(o) @@ -256,7 +260,9 @@ private[akka] trait Children { this: ActorCell ⇒ }) } catch { case NonFatal(e) ⇒ throw new IllegalArgumentException(s"pre-creation serialization check failed at [${cell.self.path}/$name]", e) - } + } finally Serialization.currentTransportInformation.value = oldInfo + } + /* * in case we are currently terminating, fail external attachChild requests * (internal calls cannot happen anyway because we are suspended) diff --git a/akka-actor/src/main/scala/akka/actor/dungeon/Dispatch.scala b/akka-actor/src/main/scala/akka/actor/dungeon/Dispatch.scala index 3d087f624c..3c78e417a1 100644 --- a/akka-actor/src/main/scala/akka/actor/dungeon/Dispatch.scala +++ b/akka-actor/src/main/scala/akka/actor/dungeon/Dispatch.scala @@ -5,6 +5,7 @@ package akka.actor.dungeon import scala.annotation.tailrec + import akka.AkkaException import akka.dispatch.{ Envelope, Mailbox } import akka.dispatch.sysmsg._ @@ -12,12 +13,13 @@ import akka.event.Logging.Error import akka.util.Unsafe import akka.actor._ import akka.serialization.{ DisabledJavaSerializer, SerializationExtension, Serializers } - import scala.util.control.{ NoStackTrace, NonFatal } import scala.util.control.Exception.Catcher + import akka.dispatch.MailboxType import akka.dispatch.ProducesMessageQueue import akka.dispatch.UnboundedMailbox +import akka.serialization.Serialization @SerialVersionUID(1L) final case class SerializationCheckFailedException private (msg: Object, cause: Throwable) @@ -169,9 +171,14 @@ private[akka] trait Dispatch { this: ActorCell ⇒ if (serializer.isInstanceOf[DisabledJavaSerializer] && !s.shouldWarnAboutJavaSerializer(obj.getClass, serializer)) obj // skip check for known "local" messages else { - val bytes = serializer.toBinary(obj) - val ms = Serializers.manifestFor(serializer, obj) - s.deserialize(bytes, serializer.identifier, ms).get + val oldInfo = Serialization.currentTransportInformation.value + try { + if (oldInfo eq null) + Serialization.currentTransportInformation.value = system.provider.serializationInformation + val bytes = serializer.toBinary(obj) + val ms = Serializers.manifestFor(serializer, obj) + s.deserialize(bytes, serializer.identifier, ms).get + } finally Serialization.currentTransportInformation.value = oldInfo } } diff --git a/akka-actor/src/main/scala/akka/serialization/Serialization.scala b/akka-actor/src/main/scala/akka/serialization/Serialization.scala index 08741d30b5..c6be3b16c3 100644 --- a/akka-actor/src/main/scala/akka/serialization/Serialization.scala +++ b/akka-actor/src/main/scala/akka/serialization/Serialization.scala @@ -31,11 +31,11 @@ object Serialization { type ClassSerializer = (Class[_], Serializer) /** - * This holds a reference to the current transport serialization information used for - * serializing local actor refs. - * INTERNAL API + * INTERNAL API: This holds a reference to the current transport serialization information used for + * serializing local actor refs, or if serializer library e.g. custom serializer/deserializer in + * Jackson need access to the current `ActorSystem`. */ - private[akka] val currentTransportInformation = new DynamicVariable[Information](null) + @InternalApi private[akka] val currentTransportInformation = new DynamicVariable[Information](null) class Settings(val config: Config) { val Serializers: Map[String, String] = configToMap(config.getConfig("akka.actor.serializers")) @@ -66,16 +66,11 @@ object Serialization { } } - /** - * Serialization information needed for serializing local actor refs. - * INTERNAL API - */ - private[akka] final case class Information(address: Address, system: ActorSystem) - /** * The serialized path of an actorRef, based on the current transport serialization information. - * If there is no external address available for the requested address then the systems default - * address will be used. + * If there is no external address available in the given `ActorRef` then the systems default + * address will be used and that is retrieved from the ThreadLocal `Serialization.Information` + * that was set with [[Serialization#withTransportInformation]]. */ def serializedActorPath(actorRef: ActorRef): String = { val path = actorRef.path @@ -101,20 +96,47 @@ object Serialization { } /** - * Use the specified @param system to determine transport information that will be used when serializing actorRefs - * in @param f code: if there is no external address available for the requested address then the systems default - * address will be used. + * Serialization information needed for serializing local actor refs, + * or if serializer library e.g. custom serializer/deserializer in Jackson need + * access to the current `ActorSystem`. + */ + final case class Information(address: Address, system: ActorSystem) + + /** + * Sets serialization information in a `ThreadLocal` and runs `f`. The information is + * needed for serializing local actor refs, or if serializer library e.g. custom serializer/deserializer + * in Jackson need access to the current `ActorSystem`. The current [[Information]] can be accessed within + * `f` via [[Serialization#getCurrentTransportInformation]]. * - * @return value returned by @param f + * Akka Remoting sets this value when serializing and deserializing messages, and when using + * the ordinary `serialize` and `deserialize` methods in [[Serialization]] the value is also + * set automatically. + * + * @return value returned by `f` */ def withTransportInformation[T](system: ExtendedActorSystem)(f: () ⇒ T): T = { - val address = system.provider.getDefaultAddress - if (address.hasLocalScope) { - f() - } else { - Serialization.currentTransportInformation.withValue(Serialization.Information(address, system)) { + val info = system.provider.serializationInformation + if (Serialization.currentTransportInformation.value eq info) + f() // already set + else + Serialization.currentTransportInformation.withValue(info) { f() } + } + + /** + * Gets the serialization information from a `ThreadLocal` that was assigned via + * [[Serialization#withTransportInformation]]. The information is needed for serializing + * local actor refs, or if serializer library e.g. custom serializer/deserializer + * in Jackson need access to the current `ActorSystem`. + * + * @throws IllegalStateException if the information was not set + */ + def getCurrentTransportInformation(): Information = { + Serialization.currentTransportInformation.value match { + case null ⇒ throw new IllegalStateException( + "currentTransportInformation is not set, use Serialization.withTransportInformation") + case t ⇒ t } } @@ -134,11 +156,28 @@ class Serialization(val system: ExtendedActorSystem) extends Extension { val log: LoggingAdapter = _log private val manifestCache = new AtomicReference[Map[String, Option[Class[_]]]](Map.empty[String, Option[Class[_]]]) + /** INTERNAL API */ + @InternalApi private[akka] def serializationInformation: Serialization.Information = + system.provider.serializationInformation + + private def withTransportInformation[T](f: () ⇒ T): T = { + val oldInfo = Serialization.currentTransportInformation.value + try { + if (oldInfo eq null) + Serialization.currentTransportInformation.value = serializationInformation + f() + } finally Serialization.currentTransportInformation.value = oldInfo + } + /** * Serializes the given AnyRef/java.lang.Object according to the Serialization configuration * to either an Array of Bytes or an Exception if one was thrown. */ - def serialize(o: AnyRef): Try[Array[Byte]] = Try(findSerializerFor(o).toBinary(o)) + def serialize(o: AnyRef): Try[Array[Byte]] = { + withTransportInformation { () ⇒ + Try(findSerializerFor(o).toBinary(o)) + } + } /** * Deserializes the given array of bytes using the specified serializer id, @@ -152,7 +191,9 @@ class Serialization(val system: ExtendedActorSystem) extends Extension { s"Cannot find serializer with id [$serializerId]. The most probable reason is that the configuration entry " + "akka.actor.serializers is not in synch between the two systems.") } - serializer.fromBinary(bytes, clazz).asInstanceOf[T] + withTransportInformation { () ⇒ + serializer.fromBinary(bytes, clazz).asInstanceOf[T] + } } /** @@ -177,27 +218,29 @@ class Serialization(val system: ExtendedActorSystem) extends Extension { updateCache(manifestCache.get, key, value) // recursive, try again } - serializer match { - case s2: SerializerWithStringManifest ⇒ s2.fromBinary(bytes, manifest) - case s1 ⇒ - if (manifest == "") - s1.fromBinary(bytes, None) - else { - val cache = manifestCache.get - cache.get(manifest) match { - case Some(cachedClassManifest) ⇒ s1.fromBinary(bytes, cachedClassManifest) - case None ⇒ - system.dynamicAccess.getClassFor[AnyRef](manifest) match { - case Success(classManifest) ⇒ - val classManifestOption: Option[Class[_]] = Some(classManifest) - updateCache(cache, manifest, classManifestOption) - s1.fromBinary(bytes, classManifestOption) - case Failure(e) ⇒ - throw new NotSerializableException( - s"Cannot find manifest class [$manifest] for serializer with id [${serializer.identifier}].") - } + withTransportInformation { () ⇒ + serializer match { + case s2: SerializerWithStringManifest ⇒ s2.fromBinary(bytes, manifest) + case s1 ⇒ + if (manifest == "") + s1.fromBinary(bytes, None) + else { + val cache = manifestCache.get + cache.get(manifest) match { + case Some(cachedClassManifest) ⇒ s1.fromBinary(bytes, cachedClassManifest) + case None ⇒ + system.dynamicAccess.getClassFor[AnyRef](manifest) match { + case Success(classManifest) ⇒ + val classManifestOption: Option[Class[_]] = Some(classManifest) + updateCache(cache, manifest, classManifestOption) + s1.fromBinary(bytes, classManifestOption) + case Failure(e) ⇒ + throw new NotSerializableException( + s"Cannot find manifest class [$manifest] for serializer with id [${serializer.identifier}].") + } + } } - } + } } } @@ -213,22 +256,34 @@ class Serialization(val system: ExtendedActorSystem) extends Extension { s"Cannot find serializer with id [$serializerId]. The most probable reason is that the configuration entry " + "akka.actor.serializers is not in synch between the two systems.") } - serializer match { - case ser: ByteBufferSerializer ⇒ - ser.fromBinary(buf, manifest) - case _ ⇒ - val bytes = new Array[Byte](buf.remaining()) - buf.get(bytes) - deserializeByteArray(bytes, serializer, manifest) - } + + // not using `withTransportInformation { () =>` because deserializeByteBuffer is supposed to be the + // possibility for allocation free serialization + val oldInfo = Serialization.currentTransportInformation.value + try { + if (oldInfo eq null) + Serialization.currentTransportInformation.value = serializationInformation + + serializer match { + case ser: ByteBufferSerializer ⇒ + ser.fromBinary(buf, manifest) + case _ ⇒ + val bytes = new Array[Byte](buf.remaining()) + buf.get(bytes) + deserializeByteArray(bytes, serializer, manifest) + } + } finally Serialization.currentTransportInformation.value = oldInfo } /** * Deserializes the given array of bytes using the specified type to look up what Serializer should be used. * Returns either the resulting object or an Exception if one was thrown. */ - def deserialize[T](bytes: Array[Byte], clazz: Class[T]): Try[T] = - Try(serializerFor(clazz).fromBinary(bytes, Some(clazz)).asInstanceOf[T]) + def deserialize[T](bytes: Array[Byte], clazz: Class[T]): Try[T] = { + withTransportInformation { () ⇒ + Try(serializerFor(clazz).fromBinary(bytes, Some(clazz)).asInstanceOf[T]) + } + } /** * Returns the Serializer configured for the given object, returns the NullSerializer if it's null. diff --git a/akka-actor/src/main/scala/akka/serialization/Serializer.scala b/akka-actor/src/main/scala/akka/serialization/Serializer.scala index 33ce6318ef..49a84479c3 100644 --- a/akka-actor/src/main/scala/akka/serialization/Serializer.scala +++ b/akka-actor/src/main/scala/akka/serialization/Serializer.scala @@ -288,13 +288,13 @@ object JavaSerializer { * If you are using Serializers yourself, outside of SerializationExtension, * you'll need to surround the serialization/deserialization with: * - * currentSystem.withValue(system) { + * JavaSerializer.currentSystem.withValue(system) { * ...code... * } * * or * - * currentSystem.withValue(system, callable) + * JavaSerializer.currentSystem.withValue(system, callable) */ val currentSystem = new CurrentSystem final class CurrentSystem extends DynamicVariable[ExtendedActorSystem](null) { diff --git a/akka-distributed-data/src/main/scala/akka/cluster/ddata/protobuf/SerializationSupport.scala b/akka-distributed-data/src/main/scala/akka/cluster/ddata/protobuf/SerializationSupport.scala index 8cdd4ae502..8533e5e63a 100644 --- a/akka-distributed-data/src/main/scala/akka/cluster/ddata/protobuf/SerializationSupport.scala +++ b/akka-distributed-data/src/main/scala/akka/cluster/ddata/protobuf/SerializationSupport.scala @@ -149,11 +149,14 @@ trait SerializationSupport { // Serialize actor references with full address information (defaultAddress). // When sending remote messages currentTransportInformation is already set, - // but when serializing for digests it must be set here. - if (Serialization.currentTransportInformation.value == null) - Serialization.currentTransportInformation.withValue(transportInformation) { buildOther() } - else + // but when serializing for digests or DurableStore it must be set here. + val oldInfo = Serialization.currentTransportInformation.value + try { + if (oldInfo eq null) + Serialization.currentTransportInformation.value = system.provider.serializationInformation buildOther() + } finally Serialization.currentTransportInformation.value = oldInfo + } def otherMessageFromBinary(bytes: Array[Byte]): AnyRef = diff --git a/akka-docs/src/test/scala/docs/persistence/PersistencePluginDocSpec.scala b/akka-docs/src/test/scala/docs/persistence/PersistencePluginDocSpec.scala index 24dcd24c55..2f17159fc0 100644 --- a/akka-docs/src/test/scala/docs/persistence/PersistencePluginDocSpec.scala +++ b/akka-docs/src/test/scala/docs/persistence/PersistencePluginDocSpec.scala @@ -193,6 +193,9 @@ object PersistenceTCKDoc { override def supportsRejectingNonSerializableObjects: CapabilityFlag = false // or CapabilityFlag.off + + override def supportsSerialization: CapabilityFlag = + true // or CapabilityFlag.on } //#journal-tck-scala } @@ -204,7 +207,11 @@ object PersistenceTCKDoc { config = ConfigFactory.parseString( """ akka.persistence.snapshot-store.plugin = "my.snapshot-store.plugin" - """)) + """)) { + + override def supportsSerialization: CapabilityFlag = + true // or CapabilityFlag.on + } //#snapshot-store-tck-scala } new AnyRef { diff --git a/akka-persistence-tck/src/main/scala/akka/persistence/CapabilityFlags.scala b/akka-persistence-tck/src/main/scala/akka/persistence/CapabilityFlags.scala index 59fd64bbe3..40da998bfd 100644 --- a/akka-persistence-tck/src/main/scala/akka/persistence/CapabilityFlags.scala +++ b/akka-persistence-tck/src/main/scala/akka/persistence/CapabilityFlags.scala @@ -44,11 +44,21 @@ trait JournalCapabilityFlags extends CapabilityFlags { */ protected def supportsRejectingNonSerializableObjects: CapabilityFlag + /** + * When `true` enables tests which check if the Journal properly serialize and + * deserialize events. + */ + protected def supportsSerialization: CapabilityFlag + } //#journal-flags //#snapshot-store-flags trait SnapshotStoreCapabilityFlags extends CapabilityFlags { - // no flags currently + /** + * When `true` enables tests which check if the snapshot store properly serialize and + * deserialize snapshots. + */ + protected def supportsSerialization: CapabilityFlag } //#snapshot-store-flags diff --git a/akka-persistence-tck/src/main/scala/akka/persistence/TestSerializer.scala b/akka-persistence-tck/src/main/scala/akka/persistence/TestSerializer.scala new file mode 100644 index 0000000000..460ff83d60 --- /dev/null +++ b/akka-persistence-tck/src/main/scala/akka/persistence/TestSerializer.scala @@ -0,0 +1,50 @@ +/** + * Copyright (C) 2018 Lightbend Inc. + */ + +package akka.persistence + +import java.nio.charset.StandardCharsets + +import akka.actor.ActorRef +import akka.actor.ExtendedActorSystem +import akka.serialization.Serialization +import akka.serialization.SerializerWithStringManifest + +final case class TestPayload(ref: ActorRef) + +class TestSerializer(system: ExtendedActorSystem) extends SerializerWithStringManifest { + def identifier: Int = 666 + def manifest(o: AnyRef): String = o match { + case _: TestPayload ⇒ "A" + } + def toBinary(o: AnyRef): Array[Byte] = o match { + case TestPayload(ref) ⇒ + verifyTransportInfo() + val refStr = Serialization.serializedActorPath(ref) + refStr.getBytes(StandardCharsets.UTF_8) + } + def fromBinary(bytes: Array[Byte], manifest: String): AnyRef = { + verifyTransportInfo() + manifest match { + case "A" ⇒ + val refStr = new String(bytes, StandardCharsets.UTF_8) + val ref = system.provider.resolveActorRef(refStr) + TestPayload(ref) + } + } + + private def verifyTransportInfo(): Unit = { + Serialization.currentTransportInformation.value match { + case null ⇒ + throw new IllegalStateException("currentTransportInformation was not set") + case t ⇒ + if (t.system ne system) + throw new IllegalStateException( + s"wrong system in currentTransportInformation, ${t.system} != $system") + if (t.address != system.provider.getDefaultAddress) + throw new IllegalStateException( + s"wrong address in currentTransportInformation, ${t.address} != ${system.provider.getDefaultAddress}") + } + } +} diff --git a/akka-persistence-tck/src/main/scala/akka/persistence/japi/journal/JavaJournalPerfSpec.scala b/akka-persistence-tck/src/main/scala/akka/persistence/japi/journal/JavaJournalPerfSpec.scala index a8e8c2791c..718478d5a0 100644 --- a/akka-persistence-tck/src/main/scala/akka/persistence/japi/journal/JavaJournalPerfSpec.scala +++ b/akka-persistence-tck/src/main/scala/akka/persistence/japi/journal/JavaJournalPerfSpec.scala @@ -50,4 +50,6 @@ class JavaJournalPerfSpec(config: Config) extends JournalPerfSpec(config) { } override protected def supportsRejectingNonSerializableObjects: CapabilityFlag = CapabilityFlag.on + + override protected def supportsSerialization: CapabilityFlag = CapabilityFlag.on } diff --git a/akka-persistence-tck/src/main/scala/akka/persistence/japi/journal/JavaJournalSpec.scala b/akka-persistence-tck/src/main/scala/akka/persistence/japi/journal/JavaJournalSpec.scala index 861aaffc9f..5941ca3919 100644 --- a/akka-persistence-tck/src/main/scala/akka/persistence/japi/journal/JavaJournalSpec.scala +++ b/akka-persistence-tck/src/main/scala/akka/persistence/japi/journal/JavaJournalSpec.scala @@ -22,4 +22,6 @@ import com.typesafe.config.Config */ class JavaJournalSpec(config: Config) extends JournalSpec(config) { override protected def supportsRejectingNonSerializableObjects: CapabilityFlag = CapabilityFlag.on + + override protected def supportsSerialization: CapabilityFlag = CapabilityFlag.on } diff --git a/akka-persistence-tck/src/main/scala/akka/persistence/japi/snapshot/JavaSnapshotStoreSpec.scala b/akka-persistence-tck/src/main/scala/akka/persistence/japi/snapshot/JavaSnapshotStoreSpec.scala index 73ae0eac39..b2d399d38a 100644 --- a/akka-persistence-tck/src/main/scala/akka/persistence/japi/snapshot/JavaSnapshotStoreSpec.scala +++ b/akka-persistence-tck/src/main/scala/akka/persistence/japi/snapshot/JavaSnapshotStoreSpec.scala @@ -4,7 +4,8 @@ package akka.persistence.japi.snapshot -import akka.persistence.snapshot.{ SnapshotStoreSpec } +import akka.persistence.CapabilityFlag +import akka.persistence.snapshot.SnapshotStoreSpec import com.typesafe.config.Config /** @@ -18,4 +19,6 @@ import com.typesafe.config.Config * * @see [[akka.persistence.snapshot.SnapshotStoreSpec]] */ -class JavaSnapshotStoreSpec(config: Config) extends SnapshotStoreSpec(config) +class JavaSnapshotStoreSpec(config: Config) extends SnapshotStoreSpec(config) { + override protected def supportsSerialization: CapabilityFlag = CapabilityFlag.on +} diff --git a/akka-persistence-tck/src/main/scala/akka/persistence/journal/JournalSpec.scala b/akka-persistence-tck/src/main/scala/akka/persistence/journal/JournalSpec.scala index 9d9bd2256a..ffb955d43e 100644 --- a/akka-persistence-tck/src/main/scala/akka/persistence/journal/JournalSpec.scala +++ b/akka-persistence-tck/src/main/scala/akka/persistence/journal/JournalSpec.scala @@ -17,9 +17,17 @@ import akka.testkit._ import com.typesafe.config._ object JournalSpec { - val config = ConfigFactory.parseString( - """ + val config: Config = ConfigFactory.parseString( + s""" akka.persistence.publish-plugin-commands = on + akka.actor { + serializers { + persistence-tck-test = "${classOf[TestSerializer].getName}" + } + serialization-bindings { + "${classOf[TestPayload].getName}" = persistence-tck-test + } + } """) } @@ -43,6 +51,8 @@ abstract class JournalSpec(config: Config) extends PluginSpec(config) with MayVe private var senderProbe: TestProbe = _ private var receiverProbe: TestProbe = _ + override protected def supportsSerialization: CapabilityFlag = true + override protected def beforeEach(): Unit = { super.beforeEach() senderProbe = TestProbe() @@ -230,5 +240,31 @@ abstract class JournalSpec(config: Config) extends PluginSpec(config) with MayVe } } } + + optional(flag = supportsSerialization) { + "serialize events" in { + val probe = TestProbe() + val event = TestPayload(probe.ref) + val aw = + AtomicWrite(PersistentRepr(payload = event, sequenceNr = 6L, persistenceId = pid, sender = Actor.noSender, + writerUuid = writerUuid)) + + journal ! WriteMessages(List(aw), probe.ref, actorInstanceId) + + probe.expectMsg(WriteMessagesSuccessful) + val Pid = pid + val WriterUuid = writerUuid + probe.expectMsgPF() { + case WriteMessageSuccess(PersistentImpl(payload, 6L, Pid, _, _, Actor.noSender, WriterUuid), _) ⇒ payload should be(event) + } + + journal ! ReplayMessages(6, Long.MaxValue, Long.MaxValue, pid, receiverProbe.ref) + receiverProbe.expectMsgPF() { + case ReplayedMessage(PersistentImpl(payload, 6L, Pid, _, _, Actor.noSender, WriterUuid)) ⇒ payload should be(event) + } + receiverProbe.expectMsg(RecoverySuccess(highestSequenceNr = 6L)) + } + } + } } diff --git a/akka-persistence-tck/src/main/scala/akka/persistence/snapshot/SnapshotStoreSpec.scala b/akka-persistence-tck/src/main/scala/akka/persistence/snapshot/SnapshotStoreSpec.scala index b23df81dfe..1c736f1b57 100644 --- a/akka-persistence-tck/src/main/scala/akka/persistence/snapshot/SnapshotStoreSpec.scala +++ b/akka-persistence-tck/src/main/scala/akka/persistence/snapshot/SnapshotStoreSpec.scala @@ -4,7 +4,7 @@ package akka.persistence.snapshot -import akka.persistence.scalatest.OptionalTests +import akka.persistence.scalatest.{ MayVerb, OptionalTests } import scala.collection.immutable.Seq import akka.actor._ @@ -15,7 +15,18 @@ import com.typesafe.config.ConfigFactory import com.typesafe.config.Config object SnapshotStoreSpec { - val config = ConfigFactory.parseString("akka.persistence.publish-plugin-commands = on") + val config: Config = ConfigFactory.parseString( + s""" + akka.persistence.publish-plugin-commands = on + akka.actor { + serializers { + persistence-tck-test = "${classOf[TestSerializer].getName}" + } + serialization-bindings { + "${classOf[TestPayload].getName}" = persistence-tck-test + } + } + """) } /** @@ -30,12 +41,14 @@ object SnapshotStoreSpec { * @see [[akka.persistence.japi.snapshot.JavaSnapshotStoreSpec]] */ abstract class SnapshotStoreSpec(config: Config) extends PluginSpec(config) - with OptionalTests with SnapshotStoreCapabilityFlags { + with MayVerb with OptionalTests with SnapshotStoreCapabilityFlags { implicit lazy val system = ActorSystem("SnapshotStoreSpec", config.withFallback(SnapshotStoreSpec.config)) private var senderProbe: TestProbe = _ private var metadata: Seq[SnapshotMetadata] = Nil + override protected def supportsSerialization: CapabilityFlag = true + override protected def beforeEach(): Unit = { super.beforeEach() senderProbe = TestProbe() @@ -152,4 +165,23 @@ abstract class SnapshotStoreSpec(config: Config) extends PluginSpec(config) senderProbe.expectMsgPF() { case SaveSnapshotSuccess(md) ⇒ md } } } + + "A snapshot store optionally" may { + optional(flag = supportsSerialization) { + "serialize snapshots" in { + val probe = TestProbe() + val metadata = SnapshotMetadata(pid, 100) + val snap = TestPayload(probe.ref) + snapshotStore.tell(SaveSnapshot(metadata, snap), senderProbe.ref) + senderProbe.expectMsgPF() { case SaveSnapshotSuccess(md) ⇒ md } + + val Pid = pid + snapshotStore.tell(LoadSnapshot(pid, SnapshotSelectionCriteria.Latest, Long.MaxValue), senderProbe.ref) + senderProbe.expectMsgPF() { + case LoadSnapshotResult(Some(SelectedSnapshot(SnapshotMetadata(Pid, 100, _), payload)), Long.MaxValue) ⇒ + payload should be(snap) + } + } + } + } } diff --git a/akka-persistence-tck/src/test/scala/akka/persistence/journal/leveldb/LeveldbJournalJavaSpec.scala b/akka-persistence-tck/src/test/scala/akka/persistence/journal/leveldb/LeveldbJournalJavaSpec.scala index b3668b5ac2..f53269f584 100644 --- a/akka-persistence-tck/src/test/scala/akka/persistence/journal/leveldb/LeveldbJournalJavaSpec.scala +++ b/akka-persistence-tck/src/test/scala/akka/persistence/journal/leveldb/LeveldbJournalJavaSpec.scala @@ -15,4 +15,6 @@ class LeveldbJournalJavaSpec extends JournalSpec( with PluginCleanup { override def supportsRejectingNonSerializableObjects = true + + override def supportsSerialization = true } diff --git a/akka-persistence-tck/src/test/scala/akka/persistence/journal/leveldb/LeveldbJournalNativePerfSpec.scala b/akka-persistence-tck/src/test/scala/akka/persistence/journal/leveldb/LeveldbJournalNativePerfSpec.scala index 75136ac6d9..c4e70ac8ff 100644 --- a/akka-persistence-tck/src/test/scala/akka/persistence/journal/leveldb/LeveldbJournalNativePerfSpec.scala +++ b/akka-persistence-tck/src/test/scala/akka/persistence/journal/leveldb/LeveldbJournalNativePerfSpec.scala @@ -18,4 +18,6 @@ class LeveldbJournalNativePerfSpec extends JournalPerfSpec( override def supportsRejectingNonSerializableObjects = true + override def supportsSerialization = true + } diff --git a/akka-persistence-tck/src/test/scala/akka/persistence/journal/leveldb/LeveldbJournalNativeSpec.scala b/akka-persistence-tck/src/test/scala/akka/persistence/journal/leveldb/LeveldbJournalNativeSpec.scala index 7aa644449f..dad783bbac 100644 --- a/akka-persistence-tck/src/test/scala/akka/persistence/journal/leveldb/LeveldbJournalNativeSpec.scala +++ b/akka-persistence-tck/src/test/scala/akka/persistence/journal/leveldb/LeveldbJournalNativeSpec.scala @@ -16,4 +16,6 @@ class LeveldbJournalNativeSpec extends JournalSpec( override def supportsRejectingNonSerializableObjects = true + override def supportsSerialization = true + } diff --git a/akka-persistence-tck/src/test/scala/akka/persistence/journal/leveldb/LeveldbJournalNoAtomicPersistMultipleEventsSpec.scala b/akka-persistence-tck/src/test/scala/akka/persistence/journal/leveldb/LeveldbJournalNoAtomicPersistMultipleEventsSpec.scala index 25f6882cbc..5b7b83b0ad 100644 --- a/akka-persistence-tck/src/test/scala/akka/persistence/journal/leveldb/LeveldbJournalNoAtomicPersistMultipleEventsSpec.scala +++ b/akka-persistence-tck/src/test/scala/akka/persistence/journal/leveldb/LeveldbJournalNoAtomicPersistMultipleEventsSpec.scala @@ -21,5 +21,7 @@ class LeveldbJournalNoAtomicPersistMultipleEventsSpec extends JournalSpec( override def supportsRejectingNonSerializableObjects = true + override def supportsSerialization = true + } diff --git a/akka-persistence-tck/src/test/scala/akka/persistence/snapshot/local/LocalSnapshotStoreSpec.scala b/akka-persistence-tck/src/test/scala/akka/persistence/snapshot/local/LocalSnapshotStoreSpec.scala index f880cc8622..429fa6e1b6 100644 --- a/akka-persistence-tck/src/test/scala/akka/persistence/snapshot/local/LocalSnapshotStoreSpec.scala +++ b/akka-persistence-tck/src/test/scala/akka/persistence/snapshot/local/LocalSnapshotStoreSpec.scala @@ -4,8 +4,8 @@ package akka.persistence.snapshot.local +import akka.persistence.CapabilityFlag import com.typesafe.config.ConfigFactory - import akka.persistence.PluginCleanup import akka.persistence.snapshot.SnapshotStoreSpec @@ -16,4 +16,7 @@ class LocalSnapshotStoreSpec extends SnapshotStoreSpec( akka.persistence.snapshot-store.plugin = "akka.persistence.snapshot-store.local" akka.persistence.snapshot-store.local.dir = "target/snapshots" """)) - with PluginCleanup + with PluginCleanup { + + override protected def supportsSerialization: CapabilityFlag = CapabilityFlag.on +} diff --git a/akka-persistence/src/main/scala/akka/persistence/serialization/MessageSerializer.scala b/akka-persistence/src/main/scala/akka/persistence/serialization/MessageSerializer.scala index 4dd7c88832..9df500a655 100644 --- a/akka-persistence/src/main/scala/akka/persistence/serialization/MessageSerializer.scala +++ b/akka-persistence/src/main/scala/akka/persistence/serialization/MessageSerializer.scala @@ -39,12 +39,6 @@ class MessageSerializer(val system: ExtendedActorSystem) extends BaseSerializer override val includeManifest: Boolean = true - private lazy val transportInformation: Option[Serialization.Information] = { - val address = system.provider.getDefaultAddress - if (address.hasLocalScope) None - else Some(Serialization.Information(address, system)) - } - /** * Serializes persistent messages. Delegates serialization of a persistent * message's payload to a matching `akka.serialization.Serializer`. @@ -175,11 +169,12 @@ class MessageSerializer(val system: ExtendedActorSystem) extends BaseSerializer builder } - // serialize actor references with full address information (defaultAddress) - transportInformation match { - case Some(ti) ⇒ Serialization.currentTransportInformation.withValue(ti) { payloadBuilder() } - case None ⇒ payloadBuilder() - } + val oldInfo = Serialization.currentTransportInformation.value + try { + if (oldInfo eq null) + Serialization.currentTransportInformation.value = system.provider.serializationInformation + payloadBuilder() + } finally Serialization.currentTransportInformation.value = oldInfo } // diff --git a/akka-persistence/src/main/scala/akka/persistence/serialization/SnapshotSerializer.scala b/akka-persistence/src/main/scala/akka/persistence/serialization/SnapshotSerializer.scala index df1c5413ed..51f13aa392 100644 --- a/akka-persistence/src/main/scala/akka/persistence/serialization/SnapshotSerializer.scala +++ b/akka-persistence/src/main/scala/akka/persistence/serialization/SnapshotSerializer.scala @@ -93,11 +93,12 @@ class SnapshotSerializer(val system: ExtendedActorSystem) extends BaseSerializer out.toByteArray } - // serialize actor references with full address information (defaultAddress) - transportInformation match { - case Some(ti) ⇒ Serialization.currentTransportInformation.withValue(ti) { serialize() } - case None ⇒ serialize() - } + val oldInfo = Serialization.currentTransportInformation.value + try { + if (oldInfo eq null) + Serialization.currentTransportInformation.value = system.provider.serializationInformation + serialize() + } finally Serialization.currentTransportInformation.value = oldInfo } private def snapshotFromBinary(bytes: Array[Byte]): AnyRef = { diff --git a/akka-persistence/src/main/scala/akka/persistence/snapshot/local/LocalSnapshotStore.scala b/akka-persistence/src/main/scala/akka/persistence/snapshot/local/LocalSnapshotStore.scala index d11a0ddde0..b01dd4113d 100644 --- a/akka-persistence/src/main/scala/akka/persistence/snapshot/local/LocalSnapshotStore.scala +++ b/akka-persistence/src/main/scala/akka/persistence/snapshot/local/LocalSnapshotStore.scala @@ -116,8 +116,9 @@ private[persistence] class LocalSnapshotStore(config: Config) extends SnapshotSt protected def deserialize(inputStream: InputStream): Snapshot = serializationExtension.deserialize(streamToBytes(inputStream), classOf[Snapshot]).get - protected def serialize(outputStream: OutputStream, snapshot: Snapshot): Unit = - outputStream.write(serializationExtension.findSerializerFor(snapshot).toBinary(snapshot)) + protected def serialize(outputStream: OutputStream, snapshot: Snapshot): Unit = { + outputStream.write(serializationExtension.serialize(snapshot).get) + } protected def withOutputStream(metadata: SnapshotMetadata)(p: (OutputStream) ⇒ Unit): File = { val tmpFile = snapshotFileForWrite(metadata, extension = "tmp") diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/RemoteRestartedQuarantinedSpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/RemoteRestartedQuarantinedSpec.scala index 0c7dc4391a..91f62576b1 100644 --- a/akka-remote-tests/src/multi-jvm/scala/akka/remote/RemoteRestartedQuarantinedSpec.scala +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/RemoteRestartedQuarantinedSpec.scala @@ -131,7 +131,6 @@ abstract class RemoteRestartedQuarantinedSpec // retry because it's possible to loose the initial message here, see issue #17314 val probe = TestProbe()(freshSystem) probe.awaitAssert({ - println(s"# --") // FIXME freshSystem.actorSelection(RootActorPath(firstAddress) / "user" / "subject").tell(Identify("subject"), probe.ref) probe.expectMsgType[ActorIdentity](1.second).ref should not be (None) }, 30.seconds) diff --git a/akka-remote/src/main/scala/akka/remote/MessageSerializer.scala b/akka-remote/src/main/scala/akka/remote/MessageSerializer.scala index e066dce8f7..0b077cf295 100644 --- a/akka-remote/src/main/scala/akka/remote/MessageSerializer.scala +++ b/akka-remote/src/main/scala/akka/remote/MessageSerializer.scala @@ -43,7 +43,12 @@ private[akka] object MessageSerializer { val s = SerializationExtension(system) val serializer = s.findSerializerFor(message) val builder = SerializedMessage.newBuilder + + val oldInfo = Serialization.currentTransportInformation.value try { + if (oldInfo eq null) + Serialization.currentTransportInformation.value = system.provider.serializationInformation + builder.setMessage(ByteString.copyFrom(serializer.toBinary(message))) builder.setSerializerId(serializer.identifier) @@ -55,21 +60,27 @@ private[akka] object MessageSerializer { case NonFatal(e) ⇒ throw new SerializationException(s"Failed to serialize remote message [${message.getClass}] " + s"using serializer [${serializer.getClass}].", e) - } + } finally Serialization.currentTransportInformation.value = oldInfo } def serializeForArtery(serialization: Serialization, outboundEnvelope: OutboundEnvelope, headerBuilder: HeaderBuilder, envelope: EnvelopeBuffer): Unit = { val message = outboundEnvelope.message val serializer = serialization.findSerializerFor(message) + val oldInfo = Serialization.currentTransportInformation.value + try { + if (oldInfo eq null) + Serialization.currentTransportInformation.value = serialization.serializationInformation - headerBuilder setSerializer serializer.identifier - headerBuilder setManifest Serializers.manifestFor(serializer, message) - envelope.writeHeader(headerBuilder, outboundEnvelope) + headerBuilder.setSerializer(serializer.identifier) + headerBuilder.setManifest(Serializers.manifestFor(serializer, message)) + envelope.writeHeader(headerBuilder, outboundEnvelope) - serializer match { - case ser: ByteBufferSerializer ⇒ ser.toBinary(message, envelope.byteBuffer) - case _ ⇒ envelope.byteBuffer.put(serializer.toBinary(message)) - } + serializer match { + case ser: ByteBufferSerializer ⇒ ser.toBinary(message, envelope.byteBuffer) + case _ ⇒ envelope.byteBuffer.put(serializer.toBinary(message)) + } + + } finally Serialization.currentTransportInformation.value = oldInfo } def deserializeForArtery(system: ExtendedActorSystem, originUid: Long, serialization: Serialization, diff --git a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala index f06468c335..b54e90e2a6 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala @@ -10,11 +10,9 @@ import akka.dispatch.sysmsg._ import akka.event.{ EventStream, Logging, LoggingAdapter } import akka.event.Logging.Error import akka.pattern.pipe - import scala.util.control.NonFatal import akka.actor.SystemGuardian.{ RegisterTerminationHook, TerminationHook, TerminationHookDone } - import scala.util.control.Exception.Catcher import scala.concurrent.Future @@ -29,6 +27,7 @@ import akka.remote.artery.OutboundEnvelope import akka.remote.artery.SystemMessageDelivery.SystemMessageEnvelope import akka.remote.serialization.ActorRefResolveThreadLocalCache import akka.remote.artery.tcp.ArteryTcpTransport +import akka.serialization.Serialization /** * INTERNAL API @@ -475,6 +474,21 @@ private[akka] class RemoteActorRefProvider( def getDefaultAddress: Address = transport.defaultAddress + // no need for volatile, only intended as cached value, not necessarily a singleton value + private var serializationInformationCache: OptionVal[Serialization.Information] = OptionVal.None + @InternalApi override private[akka] def serializationInformation: Serialization.Information = + serializationInformationCache match { + case OptionVal.Some(info) ⇒ info + case OptionVal.None ⇒ + if ((transport eq null) || (transport.defaultAddress eq null)) + local.serializationInformation // address not know yet, access before complete init and binding + else { + val info = Serialization.Information(transport.defaultAddress, transport.system) + serializationInformationCache = OptionVal.Some(info) + info + } + } + private def hasAddress(address: Address): Boolean = address == local.rootPath.address || address == rootPath.address || transport.addresses(address) @@ -508,6 +522,9 @@ private[akka] class RemoteActorRef private[akka] ( deploy: Option[Deploy]) extends InternalActorRef with RemoteRef { + if (path.address.hasLocalScope) + throw new IllegalArgumentException(s"Unexpected local address in RemoteActorRef [$this]") + remote match { case t: ArteryTransport ⇒ // detect mistakes such as using "akka.tcp" with Artery diff --git a/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala b/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala index 633346b76b..04b63069e2 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala @@ -340,7 +340,7 @@ private[remote] abstract class ArteryTransport(_system: ExtendedActorSystem, _pr def bindAddress: UniqueAddress = _bindAddress override def localAddress: UniqueAddress = _localAddress - override def defaultAddress: Address = localAddress.address + override def defaultAddress: Address = if (_localAddress eq null) null else localAddress.address override def addresses: Set[Address] = _addresses override def localAddressForRemote(remote: Address): Address = defaultAddress diff --git a/akka-remote/src/main/scala/akka/remote/artery/Association.scala b/akka-remote/src/main/scala/akka/remote/artery/Association.scala index d9fd5a0786..cc75ff9de0 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/Association.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/Association.scala @@ -130,6 +130,8 @@ private[remote] class Association( import Association._ import FlightRecorderEvents._ + require(remoteAddress.port.nonEmpty) + private val log = Logging(transport.system, getClass.getName) private def flightRecorder = transport.topLevelFlightRecorder diff --git a/akka-remote/src/main/scala/akka/remote/artery/Codecs.scala b/akka-remote/src/main/scala/akka/remote/artery/Codecs.scala index f3816754f8..b3861c4a07 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/Codecs.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/Codecs.scala @@ -57,10 +57,9 @@ private[remote] class Encoder( val logic = new GraphStageLogic(shape) with InHandler with OutHandler with StageLogging with OutboundCompressionAccess { private val headerBuilder = HeaderBuilder.out() - headerBuilder setVersion version - headerBuilder setUid uniqueLocalAddress.uid + headerBuilder.setVersion(version) + headerBuilder.setUid(uniqueLocalAddress.uid) private val localAddress = uniqueLocalAddress.address - private val serializationInfo = Serialization.Information(localAddress, system) // lazy init of SerializationExtension to avoid loading serializers before ActorRefProvider has been initialized private var _serialization: OptionVal[Serialization] = OptionVal.None @@ -104,34 +103,34 @@ private[remote] class Encoder( // without depending on compression tables being in sync when systems are restarted headerBuilder.useOutboundCompression(!outboundEnvelope.message.isInstanceOf[ArteryMessage]) - // internally compression is applied by the builder: - outboundEnvelope.recipient match { - case OptionVal.Some(r) ⇒ headerBuilder setRecipientActorRef r - case OptionVal.None ⇒ headerBuilder.setNoRecipient() - } - + // Important to set Serialization.currentTransportInformation because setRecipientActorRef + // and setSenderActorRef are using using Serialization.serializedActorPath. + // Avoiding currentTransportInformation.withValue due to thunk allocation. + val oldInfo = Serialization.currentTransportInformation.value try { - // avoiding currentTransportInformation.withValue due to thunk allocation - val oldValue = Serialization.currentTransportInformation.value - try { - Serialization.currentTransportInformation.value = serializationInfo + Serialization.currentTransportInformation.value = serialization.serializationInformation - outboundEnvelope.sender match { - case OptionVal.None ⇒ headerBuilder.setNoSender() - case OptionVal.Some(s) ⇒ headerBuilder setSenderActorRef s - } + // internally compression is applied by the builder: + outboundEnvelope.recipient match { + case OptionVal.Some(r) ⇒ headerBuilder.setRecipientActorRef(r) + case OptionVal.None ⇒ headerBuilder.setNoRecipient() + } - val startTime: Long = if (instruments.timeSerialization) System.nanoTime else 0 - if (instruments.nonEmpty) - headerBuilder.setRemoteInstruments(instruments) + outboundEnvelope.sender match { + case OptionVal.None ⇒ headerBuilder.setNoSender() + case OptionVal.Some(s) ⇒ headerBuilder.setSenderActorRef(s) + } - MessageSerializer.serializeForArtery(serialization, outboundEnvelope, headerBuilder, envelope) + val startTime: Long = if (instruments.timeSerialization) System.nanoTime else 0 + if (instruments.nonEmpty) + headerBuilder.setRemoteInstruments(instruments) - if (instruments.nonEmpty) { - val time = if (instruments.timeSerialization) System.nanoTime - startTime else 0 - instruments.messageSent(outboundEnvelope, envelope.byteBuffer.position(), time) - } - } finally Serialization.currentTransportInformation.value = oldValue + MessageSerializer.serializeForArtery(serialization, outboundEnvelope, headerBuilder, envelope) + + if (instruments.nonEmpty) { + val time = if (instruments.timeSerialization) System.nanoTime - startTime else 0 + instruments.messageSent(outboundEnvelope, envelope.byteBuffer.position(), time) + } envelope.byteBuffer.flip() @@ -162,6 +161,7 @@ private[remote] class Encoder( pull(in) } } finally { + Serialization.currentTransportInformation.value = oldInfo outboundEnvelope match { case r: ReusableOutboundEnvelope ⇒ outboundEnvelopePool.release(r) case _ ⇒ // no need to release it diff --git a/akka-remote/src/main/scala/akka/remote/artery/EnvelopeBufferPool.scala b/akka-remote/src/main/scala/akka/remote/artery/EnvelopeBufferPool.scala index 5df4ae49f8..bf12a967d7 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/EnvelopeBufferPool.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/EnvelopeBufferPool.scala @@ -198,7 +198,7 @@ private[remote] sealed trait HeaderBuilder { private[remote] final class SerializationFormatCache extends LruBoundedCache[ActorRef, String](capacity = 1024, evictAgeThreshold = 600) { - override protected def compute(ref: ActorRef): String = ref.path.toSerializationFormat + override protected def compute(ref: ActorRef): String = Serialization.serializedActorPath(ref) // Not calling ref.hashCode since it does a path.hashCode if ActorCell.undefinedUid is encountered. // Refs with ActorCell.undefinedUid will now collide all the time, but this is not a usual scenario anyway. @@ -285,8 +285,11 @@ private[remote] final class HeaderBuilderImpl( } def outboundClassManifestCompression: CompressionTable[String] = _outboundClassManifestCompression + /** + * Note that Serialization.currentTransportInformation must be set when calling this method, + * because it's using `Serialization.serializedActorPath` + */ override def setSenderActorRef(ref: ActorRef): Unit = { - // serializedActorPath includes local address from `currentTransportInformation` if (_useOutboundCompression) { _senderActorRefIdx = outboundActorRefCompression.compress(ref) if (_senderActorRefIdx == -1) _senderActorRef = Serialization.serializedActorPath(ref) @@ -316,6 +319,10 @@ private[remote] final class HeaderBuilderImpl( def isNoRecipient: Boolean = (_recipientActorRef eq null) && _recipientActorRefIdx == DeadLettersCode + /** + * Note that Serialization.currentTransportInformation must be set when calling this method, + * because it's using `Serialization.serializedActorPath` + */ def setRecipientActorRef(ref: ActorRef): Unit = { if (_useOutboundCompression) { _recipientActorRefIdx = outboundActorRefCompression.compress(ref) diff --git a/akka-remote/src/test/scala/akka/remote/artery/SerializationTransportInformationSpec.scala b/akka-remote/src/test/scala/akka/remote/artery/SerializationTransportInformationSpec.scala new file mode 100644 index 0000000000..a9099abecf --- /dev/null +++ b/akka-remote/src/test/scala/akka/remote/artery/SerializationTransportInformationSpec.scala @@ -0,0 +1,10 @@ +/** + * Copyright (C) 2018 Lightbend Inc. + */ + +package akka.remote.artery + +import akka.remote.serialization.AbstractSerializationTransportInformationSpec + +class SerializationTransportInformationSpec extends AbstractSerializationTransportInformationSpec( + ArterySpecSupport.defaultConfig) diff --git a/akka-remote/src/test/scala/akka/remote/serialization/SerializationTransportInformationSpec.scala b/akka-remote/src/test/scala/akka/remote/serialization/SerializationTransportInformationSpec.scala new file mode 100644 index 0000000000..5e7b69352f --- /dev/null +++ b/akka-remote/src/test/scala/akka/remote/serialization/SerializationTransportInformationSpec.scala @@ -0,0 +1,135 @@ +/** + * Copyright (C) 2018 Lightbend Inc. + */ + +package akka.remote.serialization + +import java.nio.charset.StandardCharsets + +import akka.actor.ActorIdentity +import akka.serialization.Serialization +import akka.actor.ActorRef +import akka.actor.ActorSystem +import akka.actor.ExtendedActorSystem +import akka.actor.Identify +import akka.actor.RootActorPath +import akka.remote.RARP +import akka.serialization.SerializerWithStringManifest +import akka.testkit.AkkaSpec +import akka.testkit.ImplicitSender +import akka.testkit.TestActors +import com.typesafe.config.Config +import com.typesafe.config.ConfigFactory + +object SerializationTransportInformationSpec { + + final case class TestMessage(from: ActorRef, to: ActorRef) + final case class JavaSerTestMessage(from: ActorRef, to: ActorRef) + + class TestSerializer(system: ExtendedActorSystem) extends SerializerWithStringManifest { + def identifier: Int = 666 + def manifest(o: AnyRef): String = o match { + case _: TestMessage ⇒ "A" + } + def toBinary(o: AnyRef): Array[Byte] = o match { + case TestMessage(from, to) ⇒ + verifyTransportInfo() + val fromStr = Serialization.serializedActorPath(from) + val toStr = Serialization.serializedActorPath(to) + s"$fromStr,$toStr".getBytes(StandardCharsets.UTF_8) + } + def fromBinary(bytes: Array[Byte], manifest: String): AnyRef = { + verifyTransportInfo() + manifest match { + case "A" ⇒ + val parts = new String(bytes, StandardCharsets.UTF_8).split(',') + val fromStr = parts(0) + val toStr = parts(1) + val from = system.provider.resolveActorRef(fromStr) + val to = system.provider.resolveActorRef(toStr) + TestMessage(from, to) + } + } + + private def verifyTransportInfo(): Unit = { + Serialization.currentTransportInformation.value match { + case null ⇒ + throw new IllegalStateException("currentTransportInformation was not set") + case t ⇒ + if (t.system ne system) + throw new IllegalStateException( + s"wrong system in currentTransportInformation, ${t.system} != $system") + if (t.address != system.provider.getDefaultAddress) + throw new IllegalStateException( + s"wrong address in currentTransportInformation, ${t.address} != ${system.provider.getDefaultAddress}") + } + } + } +} + +abstract class AbstractSerializationTransportInformationSpec(config: Config) extends AkkaSpec( + config.withFallback(ConfigFactory.parseString( + """ + akka { + loglevel = info + actor { + provider = remote + warn-about-java-serializer-usage = off + serialize-creators = off + serializers { + test = "akka.remote.serialization.SerializationTransportInformationSpec$TestSerializer" + } + serialization-bindings { + "akka.remote.serialization.SerializationTransportInformationSpec$TestMessage" = test + "akka.remote.serialization.SerializationTransportInformationSpec$JavaSerTestMessage" = java + } + } + } + """))) with ImplicitSender { + + import SerializationTransportInformationSpec._ + + val port = system.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress.port.get + val sysName = system.name + val protocol = + if (RARP(system).provider.remoteSettings.Artery.Enabled) "akka" + else "akka.tcp" + + val system2 = ActorSystem(system.name, system.settings.config) + val system2Address = RARP(system2).provider.getDefaultAddress + + "Serialization of ActorRef in remote message" must { + + "resolve address" in { + system2.actorOf(TestActors.echoActorProps, "echo") + + val echoSel = system.actorSelection(RootActorPath(system2Address) / "user" / "echo") + echoSel ! Identify(1) + val echo = expectMsgType[ActorIdentity].ref.get + + echo ! TestMessage(testActor, echo) + expectMsg(TestMessage(testActor, echo)) + + echo ! JavaSerTestMessage(testActor, echo) + expectMsg(JavaSerTestMessage(testActor, echo)) + + echo ! testActor + expectMsg(testActor) + + echo ! echo + expectMsg(echo) + + } + } + + override def afterTermination(): Unit = { + shutdown(system2) + } +} + +class SerializationTransportInformationSpec extends AbstractSerializationTransportInformationSpec(ConfigFactory.parseString(""" + akka.remote.netty.tcp { + hostname = localhost + port = 0 + } +""")) diff --git a/akka-stream/src/main/scala/akka/stream/impl/streamref/SourceRefImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/streamref/SourceRefImpl.scala index 10ff7d24a6..8dcccf2c41 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/streamref/SourceRefImpl.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/streamref/SourceRefImpl.scala @@ -218,7 +218,7 @@ private[stream] final class SourceRefStageImpl[Out]( } // else, ref is valid and we don't need to do anything with it } - /** @throws InvalidSequenceNumberException when sequence number is is invalid */ + /** @throws InvalidSequenceNumberException when sequence number is invalid */ def observeAndValidateSequenceNr(seqNr: Long, msg: String): Unit = if (isInvalidSequenceNr(seqNr)) { throw InvalidSequenceNumberException(expectingSeqNr, seqNr, msg)