diff --git a/akka-actor-tests/src/test/scala/akka/actor/ActorRefSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/ActorRefSpec.scala index befa20f00d..ff702530b8 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/ActorRefSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/ActorRefSpec.scala @@ -313,7 +313,7 @@ class ActorRefSpec extends AkkaSpec with DefaultTimeout { (intercept[java.lang.IllegalStateException] { in.readObject }).getMessage should ===("Trying to deserialize a serialized ActorRef without an ActorSystem in scope." + - " Use 'akka.serialization.Serialization.currentSystem.withValue(system) { ... }'") + " Use 'akka.serialization.JavaSerializer.currentSystem.withValue(system) { ... }'") } "return EmptyLocalActorRef on deserialize if not present in actor hierarchy (and remoting is not enabled)" in { diff --git a/akka-actor/src/main/mima-filters/2.5.12.backwards.excludes b/akka-actor/src/main/mima-filters/2.5.12.backwards.excludes index 0d0fd144f2..7cd4c8ad44 100644 --- a/akka-actor/src/main/mima-filters/2.5.12.backwards.excludes +++ b/akka-actor/src/main/mima-filters/2.5.12.backwards.excludes @@ -1,3 +1,6 @@ +# #25067 Serialization.Information +ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.actor.ActorRefProvider.serializationInformation") + # #24646 java.time.Duration ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.actor.AbstractActor#ActorContext.cancelReceiveTimeout") -ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.actor.AbstractActor#ActorContext.setReceiveTimeout") \ No newline at end of file +ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.actor.AbstractActor#ActorContext.setReceiveTimeout") diff --git a/akka-actor/src/main/scala/akka/actor/ActorRef.scala b/akka-actor/src/main/scala/akka/actor/ActorRef.scala index 035faa3c6b..a869f5366c 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRef.scala @@ -422,7 +422,7 @@ private[akka] final case class SerializedActorRef private (path: String) { case null ⇒ throw new IllegalStateException( "Trying to deserialize a serialized ActorRef without an ActorSystem in scope." + - " Use 'akka.serialization.Serialization.currentSystem.withValue(system) { ... }'") + " Use 'akka.serialization.JavaSerializer.currentSystem.withValue(system) { ... }'") case someSystem ⇒ someSystem.provider.resolveActorRef(path) } diff --git a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala index 8e93e6f2ae..62361ea02e 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala @@ -5,23 +5,30 @@ package akka.actor import akka.dispatch.sysmsg._ -import akka.dispatch.{ UnboundedMessageQueueSemantics, RequiresMessageQueue } +import akka.dispatch.{ RequiresMessageQueue, UnboundedMessageQueueSemantics } import akka.routing._ import akka.event._ -import akka.util.{ Helpers } +import akka.util.Helpers import akka.japi.Util.immutableSeq import akka.util.Collections.EmptyImmutableSeq import scala.util.control.NonFatal import java.util.concurrent.atomic.AtomicLong + import scala.concurrent.{ ExecutionContextExecutor, Future, Promise } import scala.annotation.implicitNotFound + import akka.ConfigurationException +import akka.annotation.DoNotInherit +import akka.annotation.InternalApi import akka.dispatch.Mailboxes +import akka.serialization.Serialization +import akka.util.OptionVal /** * Interface for all ActorRef providers to implement. + * Not intended for extension outside of Akka. */ -trait ActorRefProvider { +@DoNotInherit trait ActorRefProvider { /** * Reference to the supervisor of guardian and systemGuardian; this is @@ -179,6 +186,9 @@ trait ActorRefProvider { * Obtain the external address of the default transport. */ def getDefaultAddress: Address + + /** INTERNAL API */ + @InternalApi private[akka] def serializationInformation: Serialization.Information } /** @@ -795,4 +805,21 @@ private[akka] class LocalActorRefProvider private[akka] ( def getExternalAddressFor(addr: Address): Option[Address] = if (addr == rootPath.address) Some(addr) else None def getDefaultAddress: Address = rootPath.address + + // no need for volatile, only intended as cached value, not necessarily a singleton value + private var serializationInformationCache: OptionVal[Serialization.Information] = OptionVal.None + @InternalApi override private[akka] def serializationInformation: Serialization.Information = { + Serialization.Information(getDefaultAddress, system) + serializationInformationCache match { + case OptionVal.Some(info) ⇒ info + case OptionVal.None ⇒ + if (system eq null) + throw new IllegalStateException("Too early access of serializationInformation") + else { + val info = Serialization.Information(rootPath.address, system) + serializationInformationCache = OptionVal.Some(info) + info + } + } + } } diff --git a/akka-actor/src/main/scala/akka/actor/TypedActor.scala b/akka-actor/src/main/scala/akka/actor/TypedActor.scala index 24933c9a9a..3dafdf70aa 100644 --- a/akka-actor/src/main/scala/akka/actor/TypedActor.scala +++ b/akka-actor/src/main/scala/akka/actor/TypedActor.scala @@ -177,7 +177,7 @@ object TypedActor extends ExtensionId[TypedActorExtension] with ExtensionIdProvi val system = akka.serialization.JavaSerializer.currentSystem.value if (system eq null) throw new IllegalStateException( "Trying to deserialize a SerializedMethodCall without an ActorSystem in scope." + - " Use akka.serialization.Serialization.currentSystem.withValue(system) { ... }") + " Use akka.serialization.JavaSerializer.currentSystem.withValue(system) { ... }") val serialization = SerializationExtension(system) MethodCall(ownerType.getDeclaredMethod(methodName, parameterTypes: _*), serializedParameters match { case null ⇒ null @@ -443,7 +443,8 @@ object TypedActor extends ExtensionId[TypedActorExtension] with ExtensionIdProvi */ private[akka] final case class SerializedTypedActorInvocationHandler(val actor: ActorRef, val timeout: FiniteDuration) { @throws(classOf[ObjectStreamException]) private def readResolve(): AnyRef = JavaSerializer.currentSystem.value match { - case null ⇒ throw new IllegalStateException("SerializedTypedActorInvocationHandler.readResolve requires that JavaSerializer.currentSystem.value is set to a non-null value") + case null ⇒ throw new IllegalStateException("SerializedTypedActorInvocationHandler.readResolve requires that " + + "JavaSerializer.currentSystem.value is set to a non-null value") case some ⇒ toTypedActorInvocationHandler(some) } diff --git a/akka-actor/src/main/scala/akka/actor/dungeon/Children.scala b/akka-actor/src/main/scala/akka/actor/dungeon/Children.scala index 1f22a70f67..edc45ff73f 100644 --- a/akka-actor/src/main/scala/akka/actor/dungeon/Children.scala +++ b/akka-actor/src/main/scala/akka/actor/dungeon/Children.scala @@ -7,8 +7,9 @@ package akka.actor.dungeon import scala.annotation.tailrec import scala.util.control.NonFatal import scala.collection.immutable + import akka.actor._ -import akka.serialization.{ SerializationExtension, Serializers } +import akka.serialization.{ Serialization, SerializationExtension, Serializers } import akka.util.{ Helpers, Unsafe } import java.util.Optional @@ -241,13 +242,16 @@ private[akka] trait Children { this: ActorCell ⇒ } private def makeChild(cell: ActorCell, props: Props, name: String, async: Boolean, systemService: Boolean): ActorRef = { - if (cell.system.settings.SerializeAllCreators && !systemService && props.deploy.scope != LocalScope) + if (cell.system.settings.SerializeAllCreators && !systemService && props.deploy.scope != LocalScope) { + val oldInfo = Serialization.currentTransportInformation.value try { val ser = SerializationExtension(cell.system) + if (oldInfo eq null) + Serialization.currentTransportInformation.value = system.provider.serializationInformation + props.args forall (arg ⇒ arg == null || - arg.isInstanceOf[NoSerializationVerificationNeeded] || - { + arg.isInstanceOf[NoSerializationVerificationNeeded] || { val o = arg.asInstanceOf[AnyRef] val serializer = ser.findSerializerFor(o) val bytes = serializer.toBinary(o) @@ -256,7 +260,9 @@ private[akka] trait Children { this: ActorCell ⇒ }) } catch { case NonFatal(e) ⇒ throw new IllegalArgumentException(s"pre-creation serialization check failed at [${cell.self.path}/$name]", e) - } + } finally Serialization.currentTransportInformation.value = oldInfo + } + /* * in case we are currently terminating, fail external attachChild requests * (internal calls cannot happen anyway because we are suspended) diff --git a/akka-actor/src/main/scala/akka/actor/dungeon/Dispatch.scala b/akka-actor/src/main/scala/akka/actor/dungeon/Dispatch.scala index 3d087f624c..3c78e417a1 100644 --- a/akka-actor/src/main/scala/akka/actor/dungeon/Dispatch.scala +++ b/akka-actor/src/main/scala/akka/actor/dungeon/Dispatch.scala @@ -5,6 +5,7 @@ package akka.actor.dungeon import scala.annotation.tailrec + import akka.AkkaException import akka.dispatch.{ Envelope, Mailbox } import akka.dispatch.sysmsg._ @@ -12,12 +13,13 @@ import akka.event.Logging.Error import akka.util.Unsafe import akka.actor._ import akka.serialization.{ DisabledJavaSerializer, SerializationExtension, Serializers } - import scala.util.control.{ NoStackTrace, NonFatal } import scala.util.control.Exception.Catcher + import akka.dispatch.MailboxType import akka.dispatch.ProducesMessageQueue import akka.dispatch.UnboundedMailbox +import akka.serialization.Serialization @SerialVersionUID(1L) final case class SerializationCheckFailedException private (msg: Object, cause: Throwable) @@ -169,9 +171,14 @@ private[akka] trait Dispatch { this: ActorCell ⇒ if (serializer.isInstanceOf[DisabledJavaSerializer] && !s.shouldWarnAboutJavaSerializer(obj.getClass, serializer)) obj // skip check for known "local" messages else { - val bytes = serializer.toBinary(obj) - val ms = Serializers.manifestFor(serializer, obj) - s.deserialize(bytes, serializer.identifier, ms).get + val oldInfo = Serialization.currentTransportInformation.value + try { + if (oldInfo eq null) + Serialization.currentTransportInformation.value = system.provider.serializationInformation + val bytes = serializer.toBinary(obj) + val ms = Serializers.manifestFor(serializer, obj) + s.deserialize(bytes, serializer.identifier, ms).get + } finally Serialization.currentTransportInformation.value = oldInfo } } diff --git a/akka-actor/src/main/scala/akka/serialization/Serialization.scala b/akka-actor/src/main/scala/akka/serialization/Serialization.scala index 08741d30b5..c6be3b16c3 100644 --- a/akka-actor/src/main/scala/akka/serialization/Serialization.scala +++ b/akka-actor/src/main/scala/akka/serialization/Serialization.scala @@ -31,11 +31,11 @@ object Serialization { type ClassSerializer = (Class[_], Serializer) /** - * This holds a reference to the current transport serialization information used for - * serializing local actor refs. - * INTERNAL API + * INTERNAL API: This holds a reference to the current transport serialization information used for + * serializing local actor refs, or if serializer library e.g. custom serializer/deserializer in + * Jackson need access to the current `ActorSystem`. */ - private[akka] val currentTransportInformation = new DynamicVariable[Information](null) + @InternalApi private[akka] val currentTransportInformation = new DynamicVariable[Information](null) class Settings(val config: Config) { val Serializers: Map[String, String] = configToMap(config.getConfig("akka.actor.serializers")) @@ -66,16 +66,11 @@ object Serialization { } } - /** - * Serialization information needed for serializing local actor refs. - * INTERNAL API - */ - private[akka] final case class Information(address: Address, system: ActorSystem) - /** * The serialized path of an actorRef, based on the current transport serialization information. - * If there is no external address available for the requested address then the systems default - * address will be used. + * If there is no external address available in the given `ActorRef` then the systems default + * address will be used and that is retrieved from the ThreadLocal `Serialization.Information` + * that was set with [[Serialization#withTransportInformation]]. */ def serializedActorPath(actorRef: ActorRef): String = { val path = actorRef.path @@ -101,20 +96,47 @@ object Serialization { } /** - * Use the specified @param system to determine transport information that will be used when serializing actorRefs - * in @param f code: if there is no external address available for the requested address then the systems default - * address will be used. + * Serialization information needed for serializing local actor refs, + * or if serializer library e.g. custom serializer/deserializer in Jackson need + * access to the current `ActorSystem`. + */ + final case class Information(address: Address, system: ActorSystem) + + /** + * Sets serialization information in a `ThreadLocal` and runs `f`. The information is + * needed for serializing local actor refs, or if serializer library e.g. custom serializer/deserializer + * in Jackson need access to the current `ActorSystem`. The current [[Information]] can be accessed within + * `f` via [[Serialization#getCurrentTransportInformation]]. * - * @return value returned by @param f + * Akka Remoting sets this value when serializing and deserializing messages, and when using + * the ordinary `serialize` and `deserialize` methods in [[Serialization]] the value is also + * set automatically. + * + * @return value returned by `f` */ def withTransportInformation[T](system: ExtendedActorSystem)(f: () ⇒ T): T = { - val address = system.provider.getDefaultAddress - if (address.hasLocalScope) { - f() - } else { - Serialization.currentTransportInformation.withValue(Serialization.Information(address, system)) { + val info = system.provider.serializationInformation + if (Serialization.currentTransportInformation.value eq info) + f() // already set + else + Serialization.currentTransportInformation.withValue(info) { f() } + } + + /** + * Gets the serialization information from a `ThreadLocal` that was assigned via + * [[Serialization#withTransportInformation]]. The information is needed for serializing + * local actor refs, or if serializer library e.g. custom serializer/deserializer + * in Jackson need access to the current `ActorSystem`. + * + * @throws IllegalStateException if the information was not set + */ + def getCurrentTransportInformation(): Information = { + Serialization.currentTransportInformation.value match { + case null ⇒ throw new IllegalStateException( + "currentTransportInformation is not set, use Serialization.withTransportInformation") + case t ⇒ t } } @@ -134,11 +156,28 @@ class Serialization(val system: ExtendedActorSystem) extends Extension { val log: LoggingAdapter = _log private val manifestCache = new AtomicReference[Map[String, Option[Class[_]]]](Map.empty[String, Option[Class[_]]]) + /** INTERNAL API */ + @InternalApi private[akka] def serializationInformation: Serialization.Information = + system.provider.serializationInformation + + private def withTransportInformation[T](f: () ⇒ T): T = { + val oldInfo = Serialization.currentTransportInformation.value + try { + if (oldInfo eq null) + Serialization.currentTransportInformation.value = serializationInformation + f() + } finally Serialization.currentTransportInformation.value = oldInfo + } + /** * Serializes the given AnyRef/java.lang.Object according to the Serialization configuration * to either an Array of Bytes or an Exception if one was thrown. */ - def serialize(o: AnyRef): Try[Array[Byte]] = Try(findSerializerFor(o).toBinary(o)) + def serialize(o: AnyRef): Try[Array[Byte]] = { + withTransportInformation { () ⇒ + Try(findSerializerFor(o).toBinary(o)) + } + } /** * Deserializes the given array of bytes using the specified serializer id, @@ -152,7 +191,9 @@ class Serialization(val system: ExtendedActorSystem) extends Extension { s"Cannot find serializer with id [$serializerId]. The most probable reason is that the configuration entry " + "akka.actor.serializers is not in synch between the two systems.") } - serializer.fromBinary(bytes, clazz).asInstanceOf[T] + withTransportInformation { () ⇒ + serializer.fromBinary(bytes, clazz).asInstanceOf[T] + } } /** @@ -177,27 +218,29 @@ class Serialization(val system: ExtendedActorSystem) extends Extension { updateCache(manifestCache.get, key, value) // recursive, try again } - serializer match { - case s2: SerializerWithStringManifest ⇒ s2.fromBinary(bytes, manifest) - case s1 ⇒ - if (manifest == "") - s1.fromBinary(bytes, None) - else { - val cache = manifestCache.get - cache.get(manifest) match { - case Some(cachedClassManifest) ⇒ s1.fromBinary(bytes, cachedClassManifest) - case None ⇒ - system.dynamicAccess.getClassFor[AnyRef](manifest) match { - case Success(classManifest) ⇒ - val classManifestOption: Option[Class[_]] = Some(classManifest) - updateCache(cache, manifest, classManifestOption) - s1.fromBinary(bytes, classManifestOption) - case Failure(e) ⇒ - throw new NotSerializableException( - s"Cannot find manifest class [$manifest] for serializer with id [${serializer.identifier}].") - } + withTransportInformation { () ⇒ + serializer match { + case s2: SerializerWithStringManifest ⇒ s2.fromBinary(bytes, manifest) + case s1 ⇒ + if (manifest == "") + s1.fromBinary(bytes, None) + else { + val cache = manifestCache.get + cache.get(manifest) match { + case Some(cachedClassManifest) ⇒ s1.fromBinary(bytes, cachedClassManifest) + case None ⇒ + system.dynamicAccess.getClassFor[AnyRef](manifest) match { + case Success(classManifest) ⇒ + val classManifestOption: Option[Class[_]] = Some(classManifest) + updateCache(cache, manifest, classManifestOption) + s1.fromBinary(bytes, classManifestOption) + case Failure(e) ⇒ + throw new NotSerializableException( + s"Cannot find manifest class [$manifest] for serializer with id [${serializer.identifier}].") + } + } } - } + } } } @@ -213,22 +256,34 @@ class Serialization(val system: ExtendedActorSystem) extends Extension { s"Cannot find serializer with id [$serializerId]. The most probable reason is that the configuration entry " + "akka.actor.serializers is not in synch between the two systems.") } - serializer match { - case ser: ByteBufferSerializer ⇒ - ser.fromBinary(buf, manifest) - case _ ⇒ - val bytes = new Array[Byte](buf.remaining()) - buf.get(bytes) - deserializeByteArray(bytes, serializer, manifest) - } + + // not using `withTransportInformation { () =>` because deserializeByteBuffer is supposed to be the + // possibility for allocation free serialization + val oldInfo = Serialization.currentTransportInformation.value + try { + if (oldInfo eq null) + Serialization.currentTransportInformation.value = serializationInformation + + serializer match { + case ser: ByteBufferSerializer ⇒ + ser.fromBinary(buf, manifest) + case _ ⇒ + val bytes = new Array[Byte](buf.remaining()) + buf.get(bytes) + deserializeByteArray(bytes, serializer, manifest) + } + } finally Serialization.currentTransportInformation.value = oldInfo } /** * Deserializes the given array of bytes using the specified type to look up what Serializer should be used. * Returns either the resulting object or an Exception if one was thrown. */ - def deserialize[T](bytes: Array[Byte], clazz: Class[T]): Try[T] = - Try(serializerFor(clazz).fromBinary(bytes, Some(clazz)).asInstanceOf[T]) + def deserialize[T](bytes: Array[Byte], clazz: Class[T]): Try[T] = { + withTransportInformation { () ⇒ + Try(serializerFor(clazz).fromBinary(bytes, Some(clazz)).asInstanceOf[T]) + } + } /** * Returns the Serializer configured for the given object, returns the NullSerializer if it's null. diff --git a/akka-actor/src/main/scala/akka/serialization/Serializer.scala b/akka-actor/src/main/scala/akka/serialization/Serializer.scala index 33ce6318ef..49a84479c3 100644 --- a/akka-actor/src/main/scala/akka/serialization/Serializer.scala +++ b/akka-actor/src/main/scala/akka/serialization/Serializer.scala @@ -288,13 +288,13 @@ object JavaSerializer { * If you are using Serializers yourself, outside of SerializationExtension, * you'll need to surround the serialization/deserialization with: * - * currentSystem.withValue(system) { + * JavaSerializer.currentSystem.withValue(system) { * ...code... * } * * or * - * currentSystem.withValue(system, callable) + * JavaSerializer.currentSystem.withValue(system, callable) */ val currentSystem = new CurrentSystem final class CurrentSystem extends DynamicVariable[ExtendedActorSystem](null) { diff --git a/akka-distributed-data/src/main/scala/akka/cluster/ddata/protobuf/SerializationSupport.scala b/akka-distributed-data/src/main/scala/akka/cluster/ddata/protobuf/SerializationSupport.scala index 8cdd4ae502..8533e5e63a 100644 --- a/akka-distributed-data/src/main/scala/akka/cluster/ddata/protobuf/SerializationSupport.scala +++ b/akka-distributed-data/src/main/scala/akka/cluster/ddata/protobuf/SerializationSupport.scala @@ -149,11 +149,14 @@ trait SerializationSupport { // Serialize actor references with full address information (defaultAddress). // When sending remote messages currentTransportInformation is already set, - // but when serializing for digests it must be set here. - if (Serialization.currentTransportInformation.value == null) - Serialization.currentTransportInformation.withValue(transportInformation) { buildOther() } - else + // but when serializing for digests or DurableStore it must be set here. + val oldInfo = Serialization.currentTransportInformation.value + try { + if (oldInfo eq null) + Serialization.currentTransportInformation.value = system.provider.serializationInformation buildOther() + } finally Serialization.currentTransportInformation.value = oldInfo + } def otherMessageFromBinary(bytes: Array[Byte]): AnyRef = diff --git a/akka-docs/src/test/scala/docs/persistence/PersistencePluginDocSpec.scala b/akka-docs/src/test/scala/docs/persistence/PersistencePluginDocSpec.scala index 24dcd24c55..2f17159fc0 100644 --- a/akka-docs/src/test/scala/docs/persistence/PersistencePluginDocSpec.scala +++ b/akka-docs/src/test/scala/docs/persistence/PersistencePluginDocSpec.scala @@ -193,6 +193,9 @@ object PersistenceTCKDoc { override def supportsRejectingNonSerializableObjects: CapabilityFlag = false // or CapabilityFlag.off + + override def supportsSerialization: CapabilityFlag = + true // or CapabilityFlag.on } //#journal-tck-scala } @@ -204,7 +207,11 @@ object PersistenceTCKDoc { config = ConfigFactory.parseString( """ akka.persistence.snapshot-store.plugin = "my.snapshot-store.plugin" - """)) + """)) { + + override def supportsSerialization: CapabilityFlag = + true // or CapabilityFlag.on + } //#snapshot-store-tck-scala } new AnyRef { diff --git a/akka-persistence-tck/src/main/scala/akka/persistence/CapabilityFlags.scala b/akka-persistence-tck/src/main/scala/akka/persistence/CapabilityFlags.scala index 59fd64bbe3..40da998bfd 100644 --- a/akka-persistence-tck/src/main/scala/akka/persistence/CapabilityFlags.scala +++ b/akka-persistence-tck/src/main/scala/akka/persistence/CapabilityFlags.scala @@ -44,11 +44,21 @@ trait JournalCapabilityFlags extends CapabilityFlags { */ protected def supportsRejectingNonSerializableObjects: CapabilityFlag + /** + * When `true` enables tests which check if the Journal properly serialize and + * deserialize events. + */ + protected def supportsSerialization: CapabilityFlag + } //#journal-flags //#snapshot-store-flags trait SnapshotStoreCapabilityFlags extends CapabilityFlags { - // no flags currently + /** + * When `true` enables tests which check if the snapshot store properly serialize and + * deserialize snapshots. + */ + protected def supportsSerialization: CapabilityFlag } //#snapshot-store-flags diff --git a/akka-persistence-tck/src/main/scala/akka/persistence/TestSerializer.scala b/akka-persistence-tck/src/main/scala/akka/persistence/TestSerializer.scala new file mode 100644 index 0000000000..460ff83d60 --- /dev/null +++ b/akka-persistence-tck/src/main/scala/akka/persistence/TestSerializer.scala @@ -0,0 +1,50 @@ +/** + * Copyright (C) 2018 Lightbend Inc. + */ + +package akka.persistence + +import java.nio.charset.StandardCharsets + +import akka.actor.ActorRef +import akka.actor.ExtendedActorSystem +import akka.serialization.Serialization +import akka.serialization.SerializerWithStringManifest + +final case class TestPayload(ref: ActorRef) + +class TestSerializer(system: ExtendedActorSystem) extends SerializerWithStringManifest { + def identifier: Int = 666 + def manifest(o: AnyRef): String = o match { + case _: TestPayload ⇒ "A" + } + def toBinary(o: AnyRef): Array[Byte] = o match { + case TestPayload(ref) ⇒ + verifyTransportInfo() + val refStr = Serialization.serializedActorPath(ref) + refStr.getBytes(StandardCharsets.UTF_8) + } + def fromBinary(bytes: Array[Byte], manifest: String): AnyRef = { + verifyTransportInfo() + manifest match { + case "A" ⇒ + val refStr = new String(bytes, StandardCharsets.UTF_8) + val ref = system.provider.resolveActorRef(refStr) + TestPayload(ref) + } + } + + private def verifyTransportInfo(): Unit = { + Serialization.currentTransportInformation.value match { + case null ⇒ + throw new IllegalStateException("currentTransportInformation was not set") + case t ⇒ + if (t.system ne system) + throw new IllegalStateException( + s"wrong system in currentTransportInformation, ${t.system} != $system") + if (t.address != system.provider.getDefaultAddress) + throw new IllegalStateException( + s"wrong address in currentTransportInformation, ${t.address} != ${system.provider.getDefaultAddress}") + } + } +} diff --git a/akka-persistence-tck/src/main/scala/akka/persistence/japi/journal/JavaJournalPerfSpec.scala b/akka-persistence-tck/src/main/scala/akka/persistence/japi/journal/JavaJournalPerfSpec.scala index a8e8c2791c..718478d5a0 100644 --- a/akka-persistence-tck/src/main/scala/akka/persistence/japi/journal/JavaJournalPerfSpec.scala +++ b/akka-persistence-tck/src/main/scala/akka/persistence/japi/journal/JavaJournalPerfSpec.scala @@ -50,4 +50,6 @@ class JavaJournalPerfSpec(config: Config) extends JournalPerfSpec(config) { } override protected def supportsRejectingNonSerializableObjects: CapabilityFlag = CapabilityFlag.on + + override protected def supportsSerialization: CapabilityFlag = CapabilityFlag.on } diff --git a/akka-persistence-tck/src/main/scala/akka/persistence/japi/journal/JavaJournalSpec.scala b/akka-persistence-tck/src/main/scala/akka/persistence/japi/journal/JavaJournalSpec.scala index 861aaffc9f..5941ca3919 100644 --- a/akka-persistence-tck/src/main/scala/akka/persistence/japi/journal/JavaJournalSpec.scala +++ b/akka-persistence-tck/src/main/scala/akka/persistence/japi/journal/JavaJournalSpec.scala @@ -22,4 +22,6 @@ import com.typesafe.config.Config */ class JavaJournalSpec(config: Config) extends JournalSpec(config) { override protected def supportsRejectingNonSerializableObjects: CapabilityFlag = CapabilityFlag.on + + override protected def supportsSerialization: CapabilityFlag = CapabilityFlag.on } diff --git a/akka-persistence-tck/src/main/scala/akka/persistence/japi/snapshot/JavaSnapshotStoreSpec.scala b/akka-persistence-tck/src/main/scala/akka/persistence/japi/snapshot/JavaSnapshotStoreSpec.scala index 73ae0eac39..b2d399d38a 100644 --- a/akka-persistence-tck/src/main/scala/akka/persistence/japi/snapshot/JavaSnapshotStoreSpec.scala +++ b/akka-persistence-tck/src/main/scala/akka/persistence/japi/snapshot/JavaSnapshotStoreSpec.scala @@ -4,7 +4,8 @@ package akka.persistence.japi.snapshot -import akka.persistence.snapshot.{ SnapshotStoreSpec } +import akka.persistence.CapabilityFlag +import akka.persistence.snapshot.SnapshotStoreSpec import com.typesafe.config.Config /** @@ -18,4 +19,6 @@ import com.typesafe.config.Config * * @see [[akka.persistence.snapshot.SnapshotStoreSpec]] */ -class JavaSnapshotStoreSpec(config: Config) extends SnapshotStoreSpec(config) +class JavaSnapshotStoreSpec(config: Config) extends SnapshotStoreSpec(config) { + override protected def supportsSerialization: CapabilityFlag = CapabilityFlag.on +} diff --git a/akka-persistence-tck/src/main/scala/akka/persistence/journal/JournalSpec.scala b/akka-persistence-tck/src/main/scala/akka/persistence/journal/JournalSpec.scala index 9d9bd2256a..ffb955d43e 100644 --- a/akka-persistence-tck/src/main/scala/akka/persistence/journal/JournalSpec.scala +++ b/akka-persistence-tck/src/main/scala/akka/persistence/journal/JournalSpec.scala @@ -17,9 +17,17 @@ import akka.testkit._ import com.typesafe.config._ object JournalSpec { - val config = ConfigFactory.parseString( - """ + val config: Config = ConfigFactory.parseString( + s""" akka.persistence.publish-plugin-commands = on + akka.actor { + serializers { + persistence-tck-test = "${classOf[TestSerializer].getName}" + } + serialization-bindings { + "${classOf[TestPayload].getName}" = persistence-tck-test + } + } """) } @@ -43,6 +51,8 @@ abstract class JournalSpec(config: Config) extends PluginSpec(config) with MayVe private var senderProbe: TestProbe = _ private var receiverProbe: TestProbe = _ + override protected def supportsSerialization: CapabilityFlag = true + override protected def beforeEach(): Unit = { super.beforeEach() senderProbe = TestProbe() @@ -230,5 +240,31 @@ abstract class JournalSpec(config: Config) extends PluginSpec(config) with MayVe } } } + + optional(flag = supportsSerialization) { + "serialize events" in { + val probe = TestProbe() + val event = TestPayload(probe.ref) + val aw = + AtomicWrite(PersistentRepr(payload = event, sequenceNr = 6L, persistenceId = pid, sender = Actor.noSender, + writerUuid = writerUuid)) + + journal ! WriteMessages(List(aw), probe.ref, actorInstanceId) + + probe.expectMsg(WriteMessagesSuccessful) + val Pid = pid + val WriterUuid = writerUuid + probe.expectMsgPF() { + case WriteMessageSuccess(PersistentImpl(payload, 6L, Pid, _, _, Actor.noSender, WriterUuid), _) ⇒ payload should be(event) + } + + journal ! ReplayMessages(6, Long.MaxValue, Long.MaxValue, pid, receiverProbe.ref) + receiverProbe.expectMsgPF() { + case ReplayedMessage(PersistentImpl(payload, 6L, Pid, _, _, Actor.noSender, WriterUuid)) ⇒ payload should be(event) + } + receiverProbe.expectMsg(RecoverySuccess(highestSequenceNr = 6L)) + } + } + } } diff --git a/akka-persistence-tck/src/main/scala/akka/persistence/snapshot/SnapshotStoreSpec.scala b/akka-persistence-tck/src/main/scala/akka/persistence/snapshot/SnapshotStoreSpec.scala index b23df81dfe..1c736f1b57 100644 --- a/akka-persistence-tck/src/main/scala/akka/persistence/snapshot/SnapshotStoreSpec.scala +++ b/akka-persistence-tck/src/main/scala/akka/persistence/snapshot/SnapshotStoreSpec.scala @@ -4,7 +4,7 @@ package akka.persistence.snapshot -import akka.persistence.scalatest.OptionalTests +import akka.persistence.scalatest.{ MayVerb, OptionalTests } import scala.collection.immutable.Seq import akka.actor._ @@ -15,7 +15,18 @@ import com.typesafe.config.ConfigFactory import com.typesafe.config.Config object SnapshotStoreSpec { - val config = ConfigFactory.parseString("akka.persistence.publish-plugin-commands = on") + val config: Config = ConfigFactory.parseString( + s""" + akka.persistence.publish-plugin-commands = on + akka.actor { + serializers { + persistence-tck-test = "${classOf[TestSerializer].getName}" + } + serialization-bindings { + "${classOf[TestPayload].getName}" = persistence-tck-test + } + } + """) } /** @@ -30,12 +41,14 @@ object SnapshotStoreSpec { * @see [[akka.persistence.japi.snapshot.JavaSnapshotStoreSpec]] */ abstract class SnapshotStoreSpec(config: Config) extends PluginSpec(config) - with OptionalTests with SnapshotStoreCapabilityFlags { + with MayVerb with OptionalTests with SnapshotStoreCapabilityFlags { implicit lazy val system = ActorSystem("SnapshotStoreSpec", config.withFallback(SnapshotStoreSpec.config)) private var senderProbe: TestProbe = _ private var metadata: Seq[SnapshotMetadata] = Nil + override protected def supportsSerialization: CapabilityFlag = true + override protected def beforeEach(): Unit = { super.beforeEach() senderProbe = TestProbe() @@ -152,4 +165,23 @@ abstract class SnapshotStoreSpec(config: Config) extends PluginSpec(config) senderProbe.expectMsgPF() { case SaveSnapshotSuccess(md) ⇒ md } } } + + "A snapshot store optionally" may { + optional(flag = supportsSerialization) { + "serialize snapshots" in { + val probe = TestProbe() + val metadata = SnapshotMetadata(pid, 100) + val snap = TestPayload(probe.ref) + snapshotStore.tell(SaveSnapshot(metadata, snap), senderProbe.ref) + senderProbe.expectMsgPF() { case SaveSnapshotSuccess(md) ⇒ md } + + val Pid = pid + snapshotStore.tell(LoadSnapshot(pid, SnapshotSelectionCriteria.Latest, Long.MaxValue), senderProbe.ref) + senderProbe.expectMsgPF() { + case LoadSnapshotResult(Some(SelectedSnapshot(SnapshotMetadata(Pid, 100, _), payload)), Long.MaxValue) ⇒ + payload should be(snap) + } + } + } + } } diff --git a/akka-persistence-tck/src/test/scala/akka/persistence/journal/leveldb/LeveldbJournalJavaSpec.scala b/akka-persistence-tck/src/test/scala/akka/persistence/journal/leveldb/LeveldbJournalJavaSpec.scala index b3668b5ac2..f53269f584 100644 --- a/akka-persistence-tck/src/test/scala/akka/persistence/journal/leveldb/LeveldbJournalJavaSpec.scala +++ b/akka-persistence-tck/src/test/scala/akka/persistence/journal/leveldb/LeveldbJournalJavaSpec.scala @@ -15,4 +15,6 @@ class LeveldbJournalJavaSpec extends JournalSpec( with PluginCleanup { override def supportsRejectingNonSerializableObjects = true + + override def supportsSerialization = true } diff --git a/akka-persistence-tck/src/test/scala/akka/persistence/journal/leveldb/LeveldbJournalNativePerfSpec.scala b/akka-persistence-tck/src/test/scala/akka/persistence/journal/leveldb/LeveldbJournalNativePerfSpec.scala index 75136ac6d9..c4e70ac8ff 100644 --- a/akka-persistence-tck/src/test/scala/akka/persistence/journal/leveldb/LeveldbJournalNativePerfSpec.scala +++ b/akka-persistence-tck/src/test/scala/akka/persistence/journal/leveldb/LeveldbJournalNativePerfSpec.scala @@ -18,4 +18,6 @@ class LeveldbJournalNativePerfSpec extends JournalPerfSpec( override def supportsRejectingNonSerializableObjects = true + override def supportsSerialization = true + } diff --git a/akka-persistence-tck/src/test/scala/akka/persistence/journal/leveldb/LeveldbJournalNativeSpec.scala b/akka-persistence-tck/src/test/scala/akka/persistence/journal/leveldb/LeveldbJournalNativeSpec.scala index 7aa644449f..dad783bbac 100644 --- a/akka-persistence-tck/src/test/scala/akka/persistence/journal/leveldb/LeveldbJournalNativeSpec.scala +++ b/akka-persistence-tck/src/test/scala/akka/persistence/journal/leveldb/LeveldbJournalNativeSpec.scala @@ -16,4 +16,6 @@ class LeveldbJournalNativeSpec extends JournalSpec( override def supportsRejectingNonSerializableObjects = true + override def supportsSerialization = true + } diff --git a/akka-persistence-tck/src/test/scala/akka/persistence/journal/leveldb/LeveldbJournalNoAtomicPersistMultipleEventsSpec.scala b/akka-persistence-tck/src/test/scala/akka/persistence/journal/leveldb/LeveldbJournalNoAtomicPersistMultipleEventsSpec.scala index 25f6882cbc..5b7b83b0ad 100644 --- a/akka-persistence-tck/src/test/scala/akka/persistence/journal/leveldb/LeveldbJournalNoAtomicPersistMultipleEventsSpec.scala +++ b/akka-persistence-tck/src/test/scala/akka/persistence/journal/leveldb/LeveldbJournalNoAtomicPersistMultipleEventsSpec.scala @@ -21,5 +21,7 @@ class LeveldbJournalNoAtomicPersistMultipleEventsSpec extends JournalSpec( override def supportsRejectingNonSerializableObjects = true + override def supportsSerialization = true + } diff --git a/akka-persistence-tck/src/test/scala/akka/persistence/snapshot/local/LocalSnapshotStoreSpec.scala b/akka-persistence-tck/src/test/scala/akka/persistence/snapshot/local/LocalSnapshotStoreSpec.scala index f880cc8622..429fa6e1b6 100644 --- a/akka-persistence-tck/src/test/scala/akka/persistence/snapshot/local/LocalSnapshotStoreSpec.scala +++ b/akka-persistence-tck/src/test/scala/akka/persistence/snapshot/local/LocalSnapshotStoreSpec.scala @@ -4,8 +4,8 @@ package akka.persistence.snapshot.local +import akka.persistence.CapabilityFlag import com.typesafe.config.ConfigFactory - import akka.persistence.PluginCleanup import akka.persistence.snapshot.SnapshotStoreSpec @@ -16,4 +16,7 @@ class LocalSnapshotStoreSpec extends SnapshotStoreSpec( akka.persistence.snapshot-store.plugin = "akka.persistence.snapshot-store.local" akka.persistence.snapshot-store.local.dir = "target/snapshots" """)) - with PluginCleanup + with PluginCleanup { + + override protected def supportsSerialization: CapabilityFlag = CapabilityFlag.on +} diff --git a/akka-persistence/src/main/scala/akka/persistence/serialization/MessageSerializer.scala b/akka-persistence/src/main/scala/akka/persistence/serialization/MessageSerializer.scala index 4dd7c88832..9df500a655 100644 --- a/akka-persistence/src/main/scala/akka/persistence/serialization/MessageSerializer.scala +++ b/akka-persistence/src/main/scala/akka/persistence/serialization/MessageSerializer.scala @@ -39,12 +39,6 @@ class MessageSerializer(val system: ExtendedActorSystem) extends BaseSerializer override val includeManifest: Boolean = true - private lazy val transportInformation: Option[Serialization.Information] = { - val address = system.provider.getDefaultAddress - if (address.hasLocalScope) None - else Some(Serialization.Information(address, system)) - } - /** * Serializes persistent messages. Delegates serialization of a persistent * message's payload to a matching `akka.serialization.Serializer`. @@ -175,11 +169,12 @@ class MessageSerializer(val system: ExtendedActorSystem) extends BaseSerializer builder } - // serialize actor references with full address information (defaultAddress) - transportInformation match { - case Some(ti) ⇒ Serialization.currentTransportInformation.withValue(ti) { payloadBuilder() } - case None ⇒ payloadBuilder() - } + val oldInfo = Serialization.currentTransportInformation.value + try { + if (oldInfo eq null) + Serialization.currentTransportInformation.value = system.provider.serializationInformation + payloadBuilder() + } finally Serialization.currentTransportInformation.value = oldInfo } // diff --git a/akka-persistence/src/main/scala/akka/persistence/serialization/SnapshotSerializer.scala b/akka-persistence/src/main/scala/akka/persistence/serialization/SnapshotSerializer.scala index df1c5413ed..51f13aa392 100644 --- a/akka-persistence/src/main/scala/akka/persistence/serialization/SnapshotSerializer.scala +++ b/akka-persistence/src/main/scala/akka/persistence/serialization/SnapshotSerializer.scala @@ -93,11 +93,12 @@ class SnapshotSerializer(val system: ExtendedActorSystem) extends BaseSerializer out.toByteArray } - // serialize actor references with full address information (defaultAddress) - transportInformation match { - case Some(ti) ⇒ Serialization.currentTransportInformation.withValue(ti) { serialize() } - case None ⇒ serialize() - } + val oldInfo = Serialization.currentTransportInformation.value + try { + if (oldInfo eq null) + Serialization.currentTransportInformation.value = system.provider.serializationInformation + serialize() + } finally Serialization.currentTransportInformation.value = oldInfo } private def snapshotFromBinary(bytes: Array[Byte]): AnyRef = { diff --git a/akka-persistence/src/main/scala/akka/persistence/snapshot/local/LocalSnapshotStore.scala b/akka-persistence/src/main/scala/akka/persistence/snapshot/local/LocalSnapshotStore.scala index d11a0ddde0..b01dd4113d 100644 --- a/akka-persistence/src/main/scala/akka/persistence/snapshot/local/LocalSnapshotStore.scala +++ b/akka-persistence/src/main/scala/akka/persistence/snapshot/local/LocalSnapshotStore.scala @@ -116,8 +116,9 @@ private[persistence] class LocalSnapshotStore(config: Config) extends SnapshotSt protected def deserialize(inputStream: InputStream): Snapshot = serializationExtension.deserialize(streamToBytes(inputStream), classOf[Snapshot]).get - protected def serialize(outputStream: OutputStream, snapshot: Snapshot): Unit = - outputStream.write(serializationExtension.findSerializerFor(snapshot).toBinary(snapshot)) + protected def serialize(outputStream: OutputStream, snapshot: Snapshot): Unit = { + outputStream.write(serializationExtension.serialize(snapshot).get) + } protected def withOutputStream(metadata: SnapshotMetadata)(p: (OutputStream) ⇒ Unit): File = { val tmpFile = snapshotFileForWrite(metadata, extension = "tmp") diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/RemoteRestartedQuarantinedSpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/RemoteRestartedQuarantinedSpec.scala index 0c7dc4391a..91f62576b1 100644 --- a/akka-remote-tests/src/multi-jvm/scala/akka/remote/RemoteRestartedQuarantinedSpec.scala +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/RemoteRestartedQuarantinedSpec.scala @@ -131,7 +131,6 @@ abstract class RemoteRestartedQuarantinedSpec // retry because it's possible to loose the initial message here, see issue #17314 val probe = TestProbe()(freshSystem) probe.awaitAssert({ - println(s"# --") // FIXME freshSystem.actorSelection(RootActorPath(firstAddress) / "user" / "subject").tell(Identify("subject"), probe.ref) probe.expectMsgType[ActorIdentity](1.second).ref should not be (None) }, 30.seconds) diff --git a/akka-remote/src/main/scala/akka/remote/MessageSerializer.scala b/akka-remote/src/main/scala/akka/remote/MessageSerializer.scala index e066dce8f7..0b077cf295 100644 --- a/akka-remote/src/main/scala/akka/remote/MessageSerializer.scala +++ b/akka-remote/src/main/scala/akka/remote/MessageSerializer.scala @@ -43,7 +43,12 @@ private[akka] object MessageSerializer { val s = SerializationExtension(system) val serializer = s.findSerializerFor(message) val builder = SerializedMessage.newBuilder + + val oldInfo = Serialization.currentTransportInformation.value try { + if (oldInfo eq null) + Serialization.currentTransportInformation.value = system.provider.serializationInformation + builder.setMessage(ByteString.copyFrom(serializer.toBinary(message))) builder.setSerializerId(serializer.identifier) @@ -55,21 +60,27 @@ private[akka] object MessageSerializer { case NonFatal(e) ⇒ throw new SerializationException(s"Failed to serialize remote message [${message.getClass}] " + s"using serializer [${serializer.getClass}].", e) - } + } finally Serialization.currentTransportInformation.value = oldInfo } def serializeForArtery(serialization: Serialization, outboundEnvelope: OutboundEnvelope, headerBuilder: HeaderBuilder, envelope: EnvelopeBuffer): Unit = { val message = outboundEnvelope.message val serializer = serialization.findSerializerFor(message) + val oldInfo = Serialization.currentTransportInformation.value + try { + if (oldInfo eq null) + Serialization.currentTransportInformation.value = serialization.serializationInformation - headerBuilder setSerializer serializer.identifier - headerBuilder setManifest Serializers.manifestFor(serializer, message) - envelope.writeHeader(headerBuilder, outboundEnvelope) + headerBuilder.setSerializer(serializer.identifier) + headerBuilder.setManifest(Serializers.manifestFor(serializer, message)) + envelope.writeHeader(headerBuilder, outboundEnvelope) - serializer match { - case ser: ByteBufferSerializer ⇒ ser.toBinary(message, envelope.byteBuffer) - case _ ⇒ envelope.byteBuffer.put(serializer.toBinary(message)) - } + serializer match { + case ser: ByteBufferSerializer ⇒ ser.toBinary(message, envelope.byteBuffer) + case _ ⇒ envelope.byteBuffer.put(serializer.toBinary(message)) + } + + } finally Serialization.currentTransportInformation.value = oldInfo } def deserializeForArtery(system: ExtendedActorSystem, originUid: Long, serialization: Serialization, diff --git a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala index f06468c335..b54e90e2a6 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala @@ -10,11 +10,9 @@ import akka.dispatch.sysmsg._ import akka.event.{ EventStream, Logging, LoggingAdapter } import akka.event.Logging.Error import akka.pattern.pipe - import scala.util.control.NonFatal import akka.actor.SystemGuardian.{ RegisterTerminationHook, TerminationHook, TerminationHookDone } - import scala.util.control.Exception.Catcher import scala.concurrent.Future @@ -29,6 +27,7 @@ import akka.remote.artery.OutboundEnvelope import akka.remote.artery.SystemMessageDelivery.SystemMessageEnvelope import akka.remote.serialization.ActorRefResolveThreadLocalCache import akka.remote.artery.tcp.ArteryTcpTransport +import akka.serialization.Serialization /** * INTERNAL API @@ -475,6 +474,21 @@ private[akka] class RemoteActorRefProvider( def getDefaultAddress: Address = transport.defaultAddress + // no need for volatile, only intended as cached value, not necessarily a singleton value + private var serializationInformationCache: OptionVal[Serialization.Information] = OptionVal.None + @InternalApi override private[akka] def serializationInformation: Serialization.Information = + serializationInformationCache match { + case OptionVal.Some(info) ⇒ info + case OptionVal.None ⇒ + if ((transport eq null) || (transport.defaultAddress eq null)) + local.serializationInformation // address not know yet, access before complete init and binding + else { + val info = Serialization.Information(transport.defaultAddress, transport.system) + serializationInformationCache = OptionVal.Some(info) + info + } + } + private def hasAddress(address: Address): Boolean = address == local.rootPath.address || address == rootPath.address || transport.addresses(address) @@ -508,6 +522,9 @@ private[akka] class RemoteActorRef private[akka] ( deploy: Option[Deploy]) extends InternalActorRef with RemoteRef { + if (path.address.hasLocalScope) + throw new IllegalArgumentException(s"Unexpected local address in RemoteActorRef [$this]") + remote match { case t: ArteryTransport ⇒ // detect mistakes such as using "akka.tcp" with Artery diff --git a/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala b/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala index 633346b76b..04b63069e2 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala @@ -340,7 +340,7 @@ private[remote] abstract class ArteryTransport(_system: ExtendedActorSystem, _pr def bindAddress: UniqueAddress = _bindAddress override def localAddress: UniqueAddress = _localAddress - override def defaultAddress: Address = localAddress.address + override def defaultAddress: Address = if (_localAddress eq null) null else localAddress.address override def addresses: Set[Address] = _addresses override def localAddressForRemote(remote: Address): Address = defaultAddress diff --git a/akka-remote/src/main/scala/akka/remote/artery/Association.scala b/akka-remote/src/main/scala/akka/remote/artery/Association.scala index d9fd5a0786..cc75ff9de0 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/Association.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/Association.scala @@ -130,6 +130,8 @@ private[remote] class Association( import Association._ import FlightRecorderEvents._ + require(remoteAddress.port.nonEmpty) + private val log = Logging(transport.system, getClass.getName) private def flightRecorder = transport.topLevelFlightRecorder diff --git a/akka-remote/src/main/scala/akka/remote/artery/Codecs.scala b/akka-remote/src/main/scala/akka/remote/artery/Codecs.scala index f3816754f8..b3861c4a07 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/Codecs.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/Codecs.scala @@ -57,10 +57,9 @@ private[remote] class Encoder( val logic = new GraphStageLogic(shape) with InHandler with OutHandler with StageLogging with OutboundCompressionAccess { private val headerBuilder = HeaderBuilder.out() - headerBuilder setVersion version - headerBuilder setUid uniqueLocalAddress.uid + headerBuilder.setVersion(version) + headerBuilder.setUid(uniqueLocalAddress.uid) private val localAddress = uniqueLocalAddress.address - private val serializationInfo = Serialization.Information(localAddress, system) // lazy init of SerializationExtension to avoid loading serializers before ActorRefProvider has been initialized private var _serialization: OptionVal[Serialization] = OptionVal.None @@ -104,34 +103,34 @@ private[remote] class Encoder( // without depending on compression tables being in sync when systems are restarted headerBuilder.useOutboundCompression(!outboundEnvelope.message.isInstanceOf[ArteryMessage]) - // internally compression is applied by the builder: - outboundEnvelope.recipient match { - case OptionVal.Some(r) ⇒ headerBuilder setRecipientActorRef r - case OptionVal.None ⇒ headerBuilder.setNoRecipient() - } - + // Important to set Serialization.currentTransportInformation because setRecipientActorRef + // and setSenderActorRef are using using Serialization.serializedActorPath. + // Avoiding currentTransportInformation.withValue due to thunk allocation. + val oldInfo = Serialization.currentTransportInformation.value try { - // avoiding currentTransportInformation.withValue due to thunk allocation - val oldValue = Serialization.currentTransportInformation.value - try { - Serialization.currentTransportInformation.value = serializationInfo + Serialization.currentTransportInformation.value = serialization.serializationInformation - outboundEnvelope.sender match { - case OptionVal.None ⇒ headerBuilder.setNoSender() - case OptionVal.Some(s) ⇒ headerBuilder setSenderActorRef s - } + // internally compression is applied by the builder: + outboundEnvelope.recipient match { + case OptionVal.Some(r) ⇒ headerBuilder.setRecipientActorRef(r) + case OptionVal.None ⇒ headerBuilder.setNoRecipient() + } - val startTime: Long = if (instruments.timeSerialization) System.nanoTime else 0 - if (instruments.nonEmpty) - headerBuilder.setRemoteInstruments(instruments) + outboundEnvelope.sender match { + case OptionVal.None ⇒ headerBuilder.setNoSender() + case OptionVal.Some(s) ⇒ headerBuilder.setSenderActorRef(s) + } - MessageSerializer.serializeForArtery(serialization, outboundEnvelope, headerBuilder, envelope) + val startTime: Long = if (instruments.timeSerialization) System.nanoTime else 0 + if (instruments.nonEmpty) + headerBuilder.setRemoteInstruments(instruments) - if (instruments.nonEmpty) { - val time = if (instruments.timeSerialization) System.nanoTime - startTime else 0 - instruments.messageSent(outboundEnvelope, envelope.byteBuffer.position(), time) - } - } finally Serialization.currentTransportInformation.value = oldValue + MessageSerializer.serializeForArtery(serialization, outboundEnvelope, headerBuilder, envelope) + + if (instruments.nonEmpty) { + val time = if (instruments.timeSerialization) System.nanoTime - startTime else 0 + instruments.messageSent(outboundEnvelope, envelope.byteBuffer.position(), time) + } envelope.byteBuffer.flip() @@ -162,6 +161,7 @@ private[remote] class Encoder( pull(in) } } finally { + Serialization.currentTransportInformation.value = oldInfo outboundEnvelope match { case r: ReusableOutboundEnvelope ⇒ outboundEnvelopePool.release(r) case _ ⇒ // no need to release it diff --git a/akka-remote/src/main/scala/akka/remote/artery/EnvelopeBufferPool.scala b/akka-remote/src/main/scala/akka/remote/artery/EnvelopeBufferPool.scala index 5df4ae49f8..bf12a967d7 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/EnvelopeBufferPool.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/EnvelopeBufferPool.scala @@ -198,7 +198,7 @@ private[remote] sealed trait HeaderBuilder { private[remote] final class SerializationFormatCache extends LruBoundedCache[ActorRef, String](capacity = 1024, evictAgeThreshold = 600) { - override protected def compute(ref: ActorRef): String = ref.path.toSerializationFormat + override protected def compute(ref: ActorRef): String = Serialization.serializedActorPath(ref) // Not calling ref.hashCode since it does a path.hashCode if ActorCell.undefinedUid is encountered. // Refs with ActorCell.undefinedUid will now collide all the time, but this is not a usual scenario anyway. @@ -285,8 +285,11 @@ private[remote] final class HeaderBuilderImpl( } def outboundClassManifestCompression: CompressionTable[String] = _outboundClassManifestCompression + /** + * Note that Serialization.currentTransportInformation must be set when calling this method, + * because it's using `Serialization.serializedActorPath` + */ override def setSenderActorRef(ref: ActorRef): Unit = { - // serializedActorPath includes local address from `currentTransportInformation` if (_useOutboundCompression) { _senderActorRefIdx = outboundActorRefCompression.compress(ref) if (_senderActorRefIdx == -1) _senderActorRef = Serialization.serializedActorPath(ref) @@ -316,6 +319,10 @@ private[remote] final class HeaderBuilderImpl( def isNoRecipient: Boolean = (_recipientActorRef eq null) && _recipientActorRefIdx == DeadLettersCode + /** + * Note that Serialization.currentTransportInformation must be set when calling this method, + * because it's using `Serialization.serializedActorPath` + */ def setRecipientActorRef(ref: ActorRef): Unit = { if (_useOutboundCompression) { _recipientActorRefIdx = outboundActorRefCompression.compress(ref) diff --git a/akka-remote/src/test/scala/akka/remote/artery/SerializationTransportInformationSpec.scala b/akka-remote/src/test/scala/akka/remote/artery/SerializationTransportInformationSpec.scala new file mode 100644 index 0000000000..a9099abecf --- /dev/null +++ b/akka-remote/src/test/scala/akka/remote/artery/SerializationTransportInformationSpec.scala @@ -0,0 +1,10 @@ +/** + * Copyright (C) 2018 Lightbend Inc. + */ + +package akka.remote.artery + +import akka.remote.serialization.AbstractSerializationTransportInformationSpec + +class SerializationTransportInformationSpec extends AbstractSerializationTransportInformationSpec( + ArterySpecSupport.defaultConfig) diff --git a/akka-remote/src/test/scala/akka/remote/serialization/SerializationTransportInformationSpec.scala b/akka-remote/src/test/scala/akka/remote/serialization/SerializationTransportInformationSpec.scala new file mode 100644 index 0000000000..5e7b69352f --- /dev/null +++ b/akka-remote/src/test/scala/akka/remote/serialization/SerializationTransportInformationSpec.scala @@ -0,0 +1,135 @@ +/** + * Copyright (C) 2018 Lightbend Inc. + */ + +package akka.remote.serialization + +import java.nio.charset.StandardCharsets + +import akka.actor.ActorIdentity +import akka.serialization.Serialization +import akka.actor.ActorRef +import akka.actor.ActorSystem +import akka.actor.ExtendedActorSystem +import akka.actor.Identify +import akka.actor.RootActorPath +import akka.remote.RARP +import akka.serialization.SerializerWithStringManifest +import akka.testkit.AkkaSpec +import akka.testkit.ImplicitSender +import akka.testkit.TestActors +import com.typesafe.config.Config +import com.typesafe.config.ConfigFactory + +object SerializationTransportInformationSpec { + + final case class TestMessage(from: ActorRef, to: ActorRef) + final case class JavaSerTestMessage(from: ActorRef, to: ActorRef) + + class TestSerializer(system: ExtendedActorSystem) extends SerializerWithStringManifest { + def identifier: Int = 666 + def manifest(o: AnyRef): String = o match { + case _: TestMessage ⇒ "A" + } + def toBinary(o: AnyRef): Array[Byte] = o match { + case TestMessage(from, to) ⇒ + verifyTransportInfo() + val fromStr = Serialization.serializedActorPath(from) + val toStr = Serialization.serializedActorPath(to) + s"$fromStr,$toStr".getBytes(StandardCharsets.UTF_8) + } + def fromBinary(bytes: Array[Byte], manifest: String): AnyRef = { + verifyTransportInfo() + manifest match { + case "A" ⇒ + val parts = new String(bytes, StandardCharsets.UTF_8).split(',') + val fromStr = parts(0) + val toStr = parts(1) + val from = system.provider.resolveActorRef(fromStr) + val to = system.provider.resolveActorRef(toStr) + TestMessage(from, to) + } + } + + private def verifyTransportInfo(): Unit = { + Serialization.currentTransportInformation.value match { + case null ⇒ + throw new IllegalStateException("currentTransportInformation was not set") + case t ⇒ + if (t.system ne system) + throw new IllegalStateException( + s"wrong system in currentTransportInformation, ${t.system} != $system") + if (t.address != system.provider.getDefaultAddress) + throw new IllegalStateException( + s"wrong address in currentTransportInformation, ${t.address} != ${system.provider.getDefaultAddress}") + } + } + } +} + +abstract class AbstractSerializationTransportInformationSpec(config: Config) extends AkkaSpec( + config.withFallback(ConfigFactory.parseString( + """ + akka { + loglevel = info + actor { + provider = remote + warn-about-java-serializer-usage = off + serialize-creators = off + serializers { + test = "akka.remote.serialization.SerializationTransportInformationSpec$TestSerializer" + } + serialization-bindings { + "akka.remote.serialization.SerializationTransportInformationSpec$TestMessage" = test + "akka.remote.serialization.SerializationTransportInformationSpec$JavaSerTestMessage" = java + } + } + } + """))) with ImplicitSender { + + import SerializationTransportInformationSpec._ + + val port = system.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress.port.get + val sysName = system.name + val protocol = + if (RARP(system).provider.remoteSettings.Artery.Enabled) "akka" + else "akka.tcp" + + val system2 = ActorSystem(system.name, system.settings.config) + val system2Address = RARP(system2).provider.getDefaultAddress + + "Serialization of ActorRef in remote message" must { + + "resolve address" in { + system2.actorOf(TestActors.echoActorProps, "echo") + + val echoSel = system.actorSelection(RootActorPath(system2Address) / "user" / "echo") + echoSel ! Identify(1) + val echo = expectMsgType[ActorIdentity].ref.get + + echo ! TestMessage(testActor, echo) + expectMsg(TestMessage(testActor, echo)) + + echo ! JavaSerTestMessage(testActor, echo) + expectMsg(JavaSerTestMessage(testActor, echo)) + + echo ! testActor + expectMsg(testActor) + + echo ! echo + expectMsg(echo) + + } + } + + override def afterTermination(): Unit = { + shutdown(system2) + } +} + +class SerializationTransportInformationSpec extends AbstractSerializationTransportInformationSpec(ConfigFactory.parseString(""" + akka.remote.netty.tcp { + hostname = localhost + port = 0 + } +""")) diff --git a/akka-stream/src/main/scala/akka/stream/impl/streamref/SourceRefImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/streamref/SourceRefImpl.scala index 10ff7d24a6..8dcccf2c41 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/streamref/SourceRefImpl.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/streamref/SourceRefImpl.scala @@ -218,7 +218,7 @@ private[stream] final class SourceRefStageImpl[Out]( } // else, ref is valid and we don't need to do anything with it } - /** @throws InvalidSequenceNumberException when sequence number is is invalid */ + /** @throws InvalidSequenceNumberException when sequence number is invalid */ def observeAndValidateSequenceNr(seqNr: Long, msg: String): Unit = if (isInvalidSequenceNr(seqNr)) { throw InvalidSequenceNumberException(expectingSeqNr, seqNr, msg)