Make sure Serialization.currentTransportInformation is always set, #25067

* The ThreadLocal Serialization.currentTransportInformation is used for serializing local
  actor refs, but it's also useful when a serializer library e.g. custom serializer/deserializer
  in Jackson need access to the current ActorSystem.
* We set this in a rather ad-hoc way from remoting and in some persistence plugins, but it's only
  set for serialization and not deserialization, and it's easy for Persistence plugins or other
  libraries to forget this when using Akka serialization directly.
* This change is automatically setting the info when using the ordinary serialize and deserialize
  methods.
* It's also set when LocalActorRefProvider, which wasn't always the case previously.
* Keep a cached instance of Serialization.Information in the provider to avoid
  creating new instances all the time.
* Added optional Persistence TCK tests to verify that the plugin is setting this
  if it's using some custom calls to the serializer.
This commit is contained in:
Patrik Nordwall 2018-05-21 16:59:04 +02:00 committed by GitHub
parent 6ec46e762f
commit e6633f17fa
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
36 changed files with 579 additions and 146 deletions

View file

@ -313,7 +313,7 @@ class ActorRefSpec extends AkkaSpec with DefaultTimeout {
(intercept[java.lang.IllegalStateException] {
in.readObject
}).getMessage should ===("Trying to deserialize a serialized ActorRef without an ActorSystem in scope." +
" Use 'akka.serialization.Serialization.currentSystem.withValue(system) { ... }'")
" Use 'akka.serialization.JavaSerializer.currentSystem.withValue(system) { ... }'")
}
"return EmptyLocalActorRef on deserialize if not present in actor hierarchy (and remoting is not enabled)" in {

View file

@ -1,3 +1,6 @@
# #25067 Serialization.Information
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.actor.ActorRefProvider.serializationInformation")
# #24646 java.time.Duration
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.actor.AbstractActor#ActorContext.cancelReceiveTimeout")
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.actor.AbstractActor#ActorContext.setReceiveTimeout")
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.actor.AbstractActor#ActorContext.setReceiveTimeout")

View file

@ -422,7 +422,7 @@ private[akka] final case class SerializedActorRef private (path: String) {
case null
throw new IllegalStateException(
"Trying to deserialize a serialized ActorRef without an ActorSystem in scope." +
" Use 'akka.serialization.Serialization.currentSystem.withValue(system) { ... }'")
" Use 'akka.serialization.JavaSerializer.currentSystem.withValue(system) { ... }'")
case someSystem
someSystem.provider.resolveActorRef(path)
}

View file

@ -5,23 +5,30 @@
package akka.actor
import akka.dispatch.sysmsg._
import akka.dispatch.{ UnboundedMessageQueueSemantics, RequiresMessageQueue }
import akka.dispatch.{ RequiresMessageQueue, UnboundedMessageQueueSemantics }
import akka.routing._
import akka.event._
import akka.util.{ Helpers }
import akka.util.Helpers
import akka.japi.Util.immutableSeq
import akka.util.Collections.EmptyImmutableSeq
import scala.util.control.NonFatal
import java.util.concurrent.atomic.AtomicLong
import scala.concurrent.{ ExecutionContextExecutor, Future, Promise }
import scala.annotation.implicitNotFound
import akka.ConfigurationException
import akka.annotation.DoNotInherit
import akka.annotation.InternalApi
import akka.dispatch.Mailboxes
import akka.serialization.Serialization
import akka.util.OptionVal
/**
* Interface for all ActorRef providers to implement.
* Not intended for extension outside of Akka.
*/
trait ActorRefProvider {
@DoNotInherit trait ActorRefProvider {
/**
* Reference to the supervisor of guardian and systemGuardian; this is
@ -179,6 +186,9 @@ trait ActorRefProvider {
* Obtain the external address of the default transport.
*/
def getDefaultAddress: Address
/** INTERNAL API */
@InternalApi private[akka] def serializationInformation: Serialization.Information
}
/**
@ -795,4 +805,21 @@ private[akka] class LocalActorRefProvider private[akka] (
def getExternalAddressFor(addr: Address): Option[Address] = if (addr == rootPath.address) Some(addr) else None
def getDefaultAddress: Address = rootPath.address
// no need for volatile, only intended as cached value, not necessarily a singleton value
private var serializationInformationCache: OptionVal[Serialization.Information] = OptionVal.None
@InternalApi override private[akka] def serializationInformation: Serialization.Information = {
Serialization.Information(getDefaultAddress, system)
serializationInformationCache match {
case OptionVal.Some(info) info
case OptionVal.None
if (system eq null)
throw new IllegalStateException("Too early access of serializationInformation")
else {
val info = Serialization.Information(rootPath.address, system)
serializationInformationCache = OptionVal.Some(info)
info
}
}
}
}

View file

@ -177,7 +177,7 @@ object TypedActor extends ExtensionId[TypedActorExtension] with ExtensionIdProvi
val system = akka.serialization.JavaSerializer.currentSystem.value
if (system eq null) throw new IllegalStateException(
"Trying to deserialize a SerializedMethodCall without an ActorSystem in scope." +
" Use akka.serialization.Serialization.currentSystem.withValue(system) { ... }")
" Use akka.serialization.JavaSerializer.currentSystem.withValue(system) { ... }")
val serialization = SerializationExtension(system)
MethodCall(ownerType.getDeclaredMethod(methodName, parameterTypes: _*), serializedParameters match {
case null null
@ -443,7 +443,8 @@ object TypedActor extends ExtensionId[TypedActorExtension] with ExtensionIdProvi
*/
private[akka] final case class SerializedTypedActorInvocationHandler(val actor: ActorRef, val timeout: FiniteDuration) {
@throws(classOf[ObjectStreamException]) private def readResolve(): AnyRef = JavaSerializer.currentSystem.value match {
case null throw new IllegalStateException("SerializedTypedActorInvocationHandler.readResolve requires that JavaSerializer.currentSystem.value is set to a non-null value")
case null throw new IllegalStateException("SerializedTypedActorInvocationHandler.readResolve requires that " +
"JavaSerializer.currentSystem.value is set to a non-null value")
case some toTypedActorInvocationHandler(some)
}

View file

@ -7,8 +7,9 @@ package akka.actor.dungeon
import scala.annotation.tailrec
import scala.util.control.NonFatal
import scala.collection.immutable
import akka.actor._
import akka.serialization.{ SerializationExtension, Serializers }
import akka.serialization.{ Serialization, SerializationExtension, Serializers }
import akka.util.{ Helpers, Unsafe }
import java.util.Optional
@ -241,13 +242,16 @@ private[akka] trait Children { this: ActorCell ⇒
}
private def makeChild(cell: ActorCell, props: Props, name: String, async: Boolean, systemService: Boolean): ActorRef = {
if (cell.system.settings.SerializeAllCreators && !systemService && props.deploy.scope != LocalScope)
if (cell.system.settings.SerializeAllCreators && !systemService && props.deploy.scope != LocalScope) {
val oldInfo = Serialization.currentTransportInformation.value
try {
val ser = SerializationExtension(cell.system)
if (oldInfo eq null)
Serialization.currentTransportInformation.value = system.provider.serializationInformation
props.args forall (arg
arg == null ||
arg.isInstanceOf[NoSerializationVerificationNeeded] ||
{
arg.isInstanceOf[NoSerializationVerificationNeeded] || {
val o = arg.asInstanceOf[AnyRef]
val serializer = ser.findSerializerFor(o)
val bytes = serializer.toBinary(o)
@ -256,7 +260,9 @@ private[akka] trait Children { this: ActorCell ⇒
})
} catch {
case NonFatal(e) throw new IllegalArgumentException(s"pre-creation serialization check failed at [${cell.self.path}/$name]", e)
}
} finally Serialization.currentTransportInformation.value = oldInfo
}
/*
* in case we are currently terminating, fail external attachChild requests
* (internal calls cannot happen anyway because we are suspended)

View file

@ -5,6 +5,7 @@
package akka.actor.dungeon
import scala.annotation.tailrec
import akka.AkkaException
import akka.dispatch.{ Envelope, Mailbox }
import akka.dispatch.sysmsg._
@ -12,12 +13,13 @@ import akka.event.Logging.Error
import akka.util.Unsafe
import akka.actor._
import akka.serialization.{ DisabledJavaSerializer, SerializationExtension, Serializers }
import scala.util.control.{ NoStackTrace, NonFatal }
import scala.util.control.Exception.Catcher
import akka.dispatch.MailboxType
import akka.dispatch.ProducesMessageQueue
import akka.dispatch.UnboundedMailbox
import akka.serialization.Serialization
@SerialVersionUID(1L)
final case class SerializationCheckFailedException private (msg: Object, cause: Throwable)
@ -169,9 +171,14 @@ private[akka] trait Dispatch { this: ActorCell ⇒
if (serializer.isInstanceOf[DisabledJavaSerializer] && !s.shouldWarnAboutJavaSerializer(obj.getClass, serializer))
obj // skip check for known "local" messages
else {
val bytes = serializer.toBinary(obj)
val ms = Serializers.manifestFor(serializer, obj)
s.deserialize(bytes, serializer.identifier, ms).get
val oldInfo = Serialization.currentTransportInformation.value
try {
if (oldInfo eq null)
Serialization.currentTransportInformation.value = system.provider.serializationInformation
val bytes = serializer.toBinary(obj)
val ms = Serializers.manifestFor(serializer, obj)
s.deserialize(bytes, serializer.identifier, ms).get
} finally Serialization.currentTransportInformation.value = oldInfo
}
}

View file

@ -31,11 +31,11 @@ object Serialization {
type ClassSerializer = (Class[_], Serializer)
/**
* This holds a reference to the current transport serialization information used for
* serializing local actor refs.
* INTERNAL API
* INTERNAL API: This holds a reference to the current transport serialization information used for
* serializing local actor refs, or if serializer library e.g. custom serializer/deserializer in
* Jackson need access to the current `ActorSystem`.
*/
private[akka] val currentTransportInformation = new DynamicVariable[Information](null)
@InternalApi private[akka] val currentTransportInformation = new DynamicVariable[Information](null)
class Settings(val config: Config) {
val Serializers: Map[String, String] = configToMap(config.getConfig("akka.actor.serializers"))
@ -66,16 +66,11 @@ object Serialization {
}
}
/**
* Serialization information needed for serializing local actor refs.
* INTERNAL API
*/
private[akka] final case class Information(address: Address, system: ActorSystem)
/**
* The serialized path of an actorRef, based on the current transport serialization information.
* If there is no external address available for the requested address then the systems default
* address will be used.
* If there is no external address available in the given `ActorRef` then the systems default
* address will be used and that is retrieved from the ThreadLocal `Serialization.Information`
* that was set with [[Serialization#withTransportInformation]].
*/
def serializedActorPath(actorRef: ActorRef): String = {
val path = actorRef.path
@ -101,20 +96,47 @@ object Serialization {
}
/**
* Use the specified @param system to determine transport information that will be used when serializing actorRefs
* in @param f code: if there is no external address available for the requested address then the systems default
* address will be used.
* Serialization information needed for serializing local actor refs,
* or if serializer library e.g. custom serializer/deserializer in Jackson need
* access to the current `ActorSystem`.
*/
final case class Information(address: Address, system: ActorSystem)
/**
* Sets serialization information in a `ThreadLocal` and runs `f`. The information is
* needed for serializing local actor refs, or if serializer library e.g. custom serializer/deserializer
* in Jackson need access to the current `ActorSystem`. The current [[Information]] can be accessed within
* `f` via [[Serialization#getCurrentTransportInformation]].
*
* @return value returned by @param f
* Akka Remoting sets this value when serializing and deserializing messages, and when using
* the ordinary `serialize` and `deserialize` methods in [[Serialization]] the value is also
* set automatically.
*
* @return value returned by `f`
*/
def withTransportInformation[T](system: ExtendedActorSystem)(f: () T): T = {
val address = system.provider.getDefaultAddress
if (address.hasLocalScope) {
f()
} else {
Serialization.currentTransportInformation.withValue(Serialization.Information(address, system)) {
val info = system.provider.serializationInformation
if (Serialization.currentTransportInformation.value eq info)
f() // already set
else
Serialization.currentTransportInformation.withValue(info) {
f()
}
}
/**
* Gets the serialization information from a `ThreadLocal` that was assigned via
* [[Serialization#withTransportInformation]]. The information is needed for serializing
* local actor refs, or if serializer library e.g. custom serializer/deserializer
* in Jackson need access to the current `ActorSystem`.
*
* @throws IllegalStateException if the information was not set
*/
def getCurrentTransportInformation(): Information = {
Serialization.currentTransportInformation.value match {
case null throw new IllegalStateException(
"currentTransportInformation is not set, use Serialization.withTransportInformation")
case t t
}
}
@ -134,11 +156,28 @@ class Serialization(val system: ExtendedActorSystem) extends Extension {
val log: LoggingAdapter = _log
private val manifestCache = new AtomicReference[Map[String, Option[Class[_]]]](Map.empty[String, Option[Class[_]]])
/** INTERNAL API */
@InternalApi private[akka] def serializationInformation: Serialization.Information =
system.provider.serializationInformation
private def withTransportInformation[T](f: () T): T = {
val oldInfo = Serialization.currentTransportInformation.value
try {
if (oldInfo eq null)
Serialization.currentTransportInformation.value = serializationInformation
f()
} finally Serialization.currentTransportInformation.value = oldInfo
}
/**
* Serializes the given AnyRef/java.lang.Object according to the Serialization configuration
* to either an Array of Bytes or an Exception if one was thrown.
*/
def serialize(o: AnyRef): Try[Array[Byte]] = Try(findSerializerFor(o).toBinary(o))
def serialize(o: AnyRef): Try[Array[Byte]] = {
withTransportInformation { ()
Try(findSerializerFor(o).toBinary(o))
}
}
/**
* Deserializes the given array of bytes using the specified serializer id,
@ -152,7 +191,9 @@ class Serialization(val system: ExtendedActorSystem) extends Extension {
s"Cannot find serializer with id [$serializerId]. The most probable reason is that the configuration entry " +
"akka.actor.serializers is not in synch between the two systems.")
}
serializer.fromBinary(bytes, clazz).asInstanceOf[T]
withTransportInformation { ()
serializer.fromBinary(bytes, clazz).asInstanceOf[T]
}
}
/**
@ -177,27 +218,29 @@ class Serialization(val system: ExtendedActorSystem) extends Extension {
updateCache(manifestCache.get, key, value) // recursive, try again
}
serializer match {
case s2: SerializerWithStringManifest s2.fromBinary(bytes, manifest)
case s1
if (manifest == "")
s1.fromBinary(bytes, None)
else {
val cache = manifestCache.get
cache.get(manifest) match {
case Some(cachedClassManifest) s1.fromBinary(bytes, cachedClassManifest)
case None
system.dynamicAccess.getClassFor[AnyRef](manifest) match {
case Success(classManifest)
val classManifestOption: Option[Class[_]] = Some(classManifest)
updateCache(cache, manifest, classManifestOption)
s1.fromBinary(bytes, classManifestOption)
case Failure(e)
throw new NotSerializableException(
s"Cannot find manifest class [$manifest] for serializer with id [${serializer.identifier}].")
}
withTransportInformation { ()
serializer match {
case s2: SerializerWithStringManifest s2.fromBinary(bytes, manifest)
case s1
if (manifest == "")
s1.fromBinary(bytes, None)
else {
val cache = manifestCache.get
cache.get(manifest) match {
case Some(cachedClassManifest) s1.fromBinary(bytes, cachedClassManifest)
case None
system.dynamicAccess.getClassFor[AnyRef](manifest) match {
case Success(classManifest)
val classManifestOption: Option[Class[_]] = Some(classManifest)
updateCache(cache, manifest, classManifestOption)
s1.fromBinary(bytes, classManifestOption)
case Failure(e)
throw new NotSerializableException(
s"Cannot find manifest class [$manifest] for serializer with id [${serializer.identifier}].")
}
}
}
}
}
}
}
@ -213,22 +256,34 @@ class Serialization(val system: ExtendedActorSystem) extends Extension {
s"Cannot find serializer with id [$serializerId]. The most probable reason is that the configuration entry " +
"akka.actor.serializers is not in synch between the two systems.")
}
serializer match {
case ser: ByteBufferSerializer
ser.fromBinary(buf, manifest)
case _
val bytes = new Array[Byte](buf.remaining())
buf.get(bytes)
deserializeByteArray(bytes, serializer, manifest)
}
// not using `withTransportInformation { () =>` because deserializeByteBuffer is supposed to be the
// possibility for allocation free serialization
val oldInfo = Serialization.currentTransportInformation.value
try {
if (oldInfo eq null)
Serialization.currentTransportInformation.value = serializationInformation
serializer match {
case ser: ByteBufferSerializer
ser.fromBinary(buf, manifest)
case _
val bytes = new Array[Byte](buf.remaining())
buf.get(bytes)
deserializeByteArray(bytes, serializer, manifest)
}
} finally Serialization.currentTransportInformation.value = oldInfo
}
/**
* Deserializes the given array of bytes using the specified type to look up what Serializer should be used.
* Returns either the resulting object or an Exception if one was thrown.
*/
def deserialize[T](bytes: Array[Byte], clazz: Class[T]): Try[T] =
Try(serializerFor(clazz).fromBinary(bytes, Some(clazz)).asInstanceOf[T])
def deserialize[T](bytes: Array[Byte], clazz: Class[T]): Try[T] = {
withTransportInformation { ()
Try(serializerFor(clazz).fromBinary(bytes, Some(clazz)).asInstanceOf[T])
}
}
/**
* Returns the Serializer configured for the given object, returns the NullSerializer if it's null.

View file

@ -288,13 +288,13 @@ object JavaSerializer {
* If you are using Serializers yourself, outside of SerializationExtension,
* you'll need to surround the serialization/deserialization with:
*
* currentSystem.withValue(system) {
* JavaSerializer.currentSystem.withValue(system) {
* ...code...
* }
*
* or
*
* currentSystem.withValue(system, callable)
* JavaSerializer.currentSystem.withValue(system, callable)
*/
val currentSystem = new CurrentSystem
final class CurrentSystem extends DynamicVariable[ExtendedActorSystem](null) {

View file

@ -149,11 +149,14 @@ trait SerializationSupport {
// Serialize actor references with full address information (defaultAddress).
// When sending remote messages currentTransportInformation is already set,
// but when serializing for digests it must be set here.
if (Serialization.currentTransportInformation.value == null)
Serialization.currentTransportInformation.withValue(transportInformation) { buildOther() }
else
// but when serializing for digests or DurableStore it must be set here.
val oldInfo = Serialization.currentTransportInformation.value
try {
if (oldInfo eq null)
Serialization.currentTransportInformation.value = system.provider.serializationInformation
buildOther()
} finally Serialization.currentTransportInformation.value = oldInfo
}
def otherMessageFromBinary(bytes: Array[Byte]): AnyRef =

View file

@ -193,6 +193,9 @@ object PersistenceTCKDoc {
override def supportsRejectingNonSerializableObjects: CapabilityFlag =
false // or CapabilityFlag.off
override def supportsSerialization: CapabilityFlag =
true // or CapabilityFlag.on
}
//#journal-tck-scala
}
@ -204,7 +207,11 @@ object PersistenceTCKDoc {
config = ConfigFactory.parseString(
"""
akka.persistence.snapshot-store.plugin = "my.snapshot-store.plugin"
"""))
""")) {
override def supportsSerialization: CapabilityFlag =
true // or CapabilityFlag.on
}
//#snapshot-store-tck-scala
}
new AnyRef {

View file

@ -44,11 +44,21 @@ trait JournalCapabilityFlags extends CapabilityFlags {
*/
protected def supportsRejectingNonSerializableObjects: CapabilityFlag
/**
* When `true` enables tests which check if the Journal properly serialize and
* deserialize events.
*/
protected def supportsSerialization: CapabilityFlag
}
//#journal-flags
//#snapshot-store-flags
trait SnapshotStoreCapabilityFlags extends CapabilityFlags {
// no flags currently
/**
* When `true` enables tests which check if the snapshot store properly serialize and
* deserialize snapshots.
*/
protected def supportsSerialization: CapabilityFlag
}
//#snapshot-store-flags

View file

@ -0,0 +1,50 @@
/**
* Copyright (C) 2018 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.persistence
import java.nio.charset.StandardCharsets
import akka.actor.ActorRef
import akka.actor.ExtendedActorSystem
import akka.serialization.Serialization
import akka.serialization.SerializerWithStringManifest
final case class TestPayload(ref: ActorRef)
class TestSerializer(system: ExtendedActorSystem) extends SerializerWithStringManifest {
def identifier: Int = 666
def manifest(o: AnyRef): String = o match {
case _: TestPayload "A"
}
def toBinary(o: AnyRef): Array[Byte] = o match {
case TestPayload(ref)
verifyTransportInfo()
val refStr = Serialization.serializedActorPath(ref)
refStr.getBytes(StandardCharsets.UTF_8)
}
def fromBinary(bytes: Array[Byte], manifest: String): AnyRef = {
verifyTransportInfo()
manifest match {
case "A"
val refStr = new String(bytes, StandardCharsets.UTF_8)
val ref = system.provider.resolveActorRef(refStr)
TestPayload(ref)
}
}
private def verifyTransportInfo(): Unit = {
Serialization.currentTransportInformation.value match {
case null
throw new IllegalStateException("currentTransportInformation was not set")
case t
if (t.system ne system)
throw new IllegalStateException(
s"wrong system in currentTransportInformation, ${t.system} != $system")
if (t.address != system.provider.getDefaultAddress)
throw new IllegalStateException(
s"wrong address in currentTransportInformation, ${t.address} != ${system.provider.getDefaultAddress}")
}
}
}

View file

@ -50,4 +50,6 @@ class JavaJournalPerfSpec(config: Config) extends JournalPerfSpec(config) {
}
override protected def supportsRejectingNonSerializableObjects: CapabilityFlag = CapabilityFlag.on
override protected def supportsSerialization: CapabilityFlag = CapabilityFlag.on
}

View file

@ -22,4 +22,6 @@ import com.typesafe.config.Config
*/
class JavaJournalSpec(config: Config) extends JournalSpec(config) {
override protected def supportsRejectingNonSerializableObjects: CapabilityFlag = CapabilityFlag.on
override protected def supportsSerialization: CapabilityFlag = CapabilityFlag.on
}

View file

@ -4,7 +4,8 @@
package akka.persistence.japi.snapshot
import akka.persistence.snapshot.{ SnapshotStoreSpec }
import akka.persistence.CapabilityFlag
import akka.persistence.snapshot.SnapshotStoreSpec
import com.typesafe.config.Config
/**
@ -18,4 +19,6 @@ import com.typesafe.config.Config
*
* @see [[akka.persistence.snapshot.SnapshotStoreSpec]]
*/
class JavaSnapshotStoreSpec(config: Config) extends SnapshotStoreSpec(config)
class JavaSnapshotStoreSpec(config: Config) extends SnapshotStoreSpec(config) {
override protected def supportsSerialization: CapabilityFlag = CapabilityFlag.on
}

View file

@ -17,9 +17,17 @@ import akka.testkit._
import com.typesafe.config._
object JournalSpec {
val config = ConfigFactory.parseString(
"""
val config: Config = ConfigFactory.parseString(
s"""
akka.persistence.publish-plugin-commands = on
akka.actor {
serializers {
persistence-tck-test = "${classOf[TestSerializer].getName}"
}
serialization-bindings {
"${classOf[TestPayload].getName}" = persistence-tck-test
}
}
""")
}
@ -43,6 +51,8 @@ abstract class JournalSpec(config: Config) extends PluginSpec(config) with MayVe
private var senderProbe: TestProbe = _
private var receiverProbe: TestProbe = _
override protected def supportsSerialization: CapabilityFlag = true
override protected def beforeEach(): Unit = {
super.beforeEach()
senderProbe = TestProbe()
@ -230,5 +240,31 @@ abstract class JournalSpec(config: Config) extends PluginSpec(config) with MayVe
}
}
}
optional(flag = supportsSerialization) {
"serialize events" in {
val probe = TestProbe()
val event = TestPayload(probe.ref)
val aw =
AtomicWrite(PersistentRepr(payload = event, sequenceNr = 6L, persistenceId = pid, sender = Actor.noSender,
writerUuid = writerUuid))
journal ! WriteMessages(List(aw), probe.ref, actorInstanceId)
probe.expectMsg(WriteMessagesSuccessful)
val Pid = pid
val WriterUuid = writerUuid
probe.expectMsgPF() {
case WriteMessageSuccess(PersistentImpl(payload, 6L, Pid, _, _, Actor.noSender, WriterUuid), _) payload should be(event)
}
journal ! ReplayMessages(6, Long.MaxValue, Long.MaxValue, pid, receiverProbe.ref)
receiverProbe.expectMsgPF() {
case ReplayedMessage(PersistentImpl(payload, 6L, Pid, _, _, Actor.noSender, WriterUuid)) payload should be(event)
}
receiverProbe.expectMsg(RecoverySuccess(highestSequenceNr = 6L))
}
}
}
}

View file

@ -4,7 +4,7 @@
package akka.persistence.snapshot
import akka.persistence.scalatest.OptionalTests
import akka.persistence.scalatest.{ MayVerb, OptionalTests }
import scala.collection.immutable.Seq
import akka.actor._
@ -15,7 +15,18 @@ import com.typesafe.config.ConfigFactory
import com.typesafe.config.Config
object SnapshotStoreSpec {
val config = ConfigFactory.parseString("akka.persistence.publish-plugin-commands = on")
val config: Config = ConfigFactory.parseString(
s"""
akka.persistence.publish-plugin-commands = on
akka.actor {
serializers {
persistence-tck-test = "${classOf[TestSerializer].getName}"
}
serialization-bindings {
"${classOf[TestPayload].getName}" = persistence-tck-test
}
}
""")
}
/**
@ -30,12 +41,14 @@ object SnapshotStoreSpec {
* @see [[akka.persistence.japi.snapshot.JavaSnapshotStoreSpec]]
*/
abstract class SnapshotStoreSpec(config: Config) extends PluginSpec(config)
with OptionalTests with SnapshotStoreCapabilityFlags {
with MayVerb with OptionalTests with SnapshotStoreCapabilityFlags {
implicit lazy val system = ActorSystem("SnapshotStoreSpec", config.withFallback(SnapshotStoreSpec.config))
private var senderProbe: TestProbe = _
private var metadata: Seq[SnapshotMetadata] = Nil
override protected def supportsSerialization: CapabilityFlag = true
override protected def beforeEach(): Unit = {
super.beforeEach()
senderProbe = TestProbe()
@ -152,4 +165,23 @@ abstract class SnapshotStoreSpec(config: Config) extends PluginSpec(config)
senderProbe.expectMsgPF() { case SaveSnapshotSuccess(md) md }
}
}
"A snapshot store optionally" may {
optional(flag = supportsSerialization) {
"serialize snapshots" in {
val probe = TestProbe()
val metadata = SnapshotMetadata(pid, 100)
val snap = TestPayload(probe.ref)
snapshotStore.tell(SaveSnapshot(metadata, snap), senderProbe.ref)
senderProbe.expectMsgPF() { case SaveSnapshotSuccess(md) md }
val Pid = pid
snapshotStore.tell(LoadSnapshot(pid, SnapshotSelectionCriteria.Latest, Long.MaxValue), senderProbe.ref)
senderProbe.expectMsgPF() {
case LoadSnapshotResult(Some(SelectedSnapshot(SnapshotMetadata(Pid, 100, _), payload)), Long.MaxValue)
payload should be(snap)
}
}
}
}
}

View file

@ -15,4 +15,6 @@ class LeveldbJournalJavaSpec extends JournalSpec(
with PluginCleanup {
override def supportsRejectingNonSerializableObjects = true
override def supportsSerialization = true
}

View file

@ -18,4 +18,6 @@ class LeveldbJournalNativePerfSpec extends JournalPerfSpec(
override def supportsRejectingNonSerializableObjects = true
override def supportsSerialization = true
}

View file

@ -16,4 +16,6 @@ class LeveldbJournalNativeSpec extends JournalSpec(
override def supportsRejectingNonSerializableObjects = true
override def supportsSerialization = true
}

View file

@ -21,5 +21,7 @@ class LeveldbJournalNoAtomicPersistMultipleEventsSpec extends JournalSpec(
override def supportsRejectingNonSerializableObjects = true
override def supportsSerialization = true
}

View file

@ -4,8 +4,8 @@
package akka.persistence.snapshot.local
import akka.persistence.CapabilityFlag
import com.typesafe.config.ConfigFactory
import akka.persistence.PluginCleanup
import akka.persistence.snapshot.SnapshotStoreSpec
@ -16,4 +16,7 @@ class LocalSnapshotStoreSpec extends SnapshotStoreSpec(
akka.persistence.snapshot-store.plugin = "akka.persistence.snapshot-store.local"
akka.persistence.snapshot-store.local.dir = "target/snapshots"
"""))
with PluginCleanup
with PluginCleanup {
override protected def supportsSerialization: CapabilityFlag = CapabilityFlag.on
}

View file

@ -39,12 +39,6 @@ class MessageSerializer(val system: ExtendedActorSystem) extends BaseSerializer
override val includeManifest: Boolean = true
private lazy val transportInformation: Option[Serialization.Information] = {
val address = system.provider.getDefaultAddress
if (address.hasLocalScope) None
else Some(Serialization.Information(address, system))
}
/**
* Serializes persistent messages. Delegates serialization of a persistent
* message's payload to a matching `akka.serialization.Serializer`.
@ -175,11 +169,12 @@ class MessageSerializer(val system: ExtendedActorSystem) extends BaseSerializer
builder
}
// serialize actor references with full address information (defaultAddress)
transportInformation match {
case Some(ti) Serialization.currentTransportInformation.withValue(ti) { payloadBuilder() }
case None payloadBuilder()
}
val oldInfo = Serialization.currentTransportInformation.value
try {
if (oldInfo eq null)
Serialization.currentTransportInformation.value = system.provider.serializationInformation
payloadBuilder()
} finally Serialization.currentTransportInformation.value = oldInfo
}
//

View file

@ -93,11 +93,12 @@ class SnapshotSerializer(val system: ExtendedActorSystem) extends BaseSerializer
out.toByteArray
}
// serialize actor references with full address information (defaultAddress)
transportInformation match {
case Some(ti) Serialization.currentTransportInformation.withValue(ti) { serialize() }
case None serialize()
}
val oldInfo = Serialization.currentTransportInformation.value
try {
if (oldInfo eq null)
Serialization.currentTransportInformation.value = system.provider.serializationInformation
serialize()
} finally Serialization.currentTransportInformation.value = oldInfo
}
private def snapshotFromBinary(bytes: Array[Byte]): AnyRef = {

View file

@ -116,8 +116,9 @@ private[persistence] class LocalSnapshotStore(config: Config) extends SnapshotSt
protected def deserialize(inputStream: InputStream): Snapshot =
serializationExtension.deserialize(streamToBytes(inputStream), classOf[Snapshot]).get
protected def serialize(outputStream: OutputStream, snapshot: Snapshot): Unit =
outputStream.write(serializationExtension.findSerializerFor(snapshot).toBinary(snapshot))
protected def serialize(outputStream: OutputStream, snapshot: Snapshot): Unit = {
outputStream.write(serializationExtension.serialize(snapshot).get)
}
protected def withOutputStream(metadata: SnapshotMetadata)(p: (OutputStream) Unit): File = {
val tmpFile = snapshotFileForWrite(metadata, extension = "tmp")

View file

@ -131,7 +131,6 @@ abstract class RemoteRestartedQuarantinedSpec
// retry because it's possible to loose the initial message here, see issue #17314
val probe = TestProbe()(freshSystem)
probe.awaitAssert({
println(s"# --") // FIXME
freshSystem.actorSelection(RootActorPath(firstAddress) / "user" / "subject").tell(Identify("subject"), probe.ref)
probe.expectMsgType[ActorIdentity](1.second).ref should not be (None)
}, 30.seconds)

View file

@ -43,7 +43,12 @@ private[akka] object MessageSerializer {
val s = SerializationExtension(system)
val serializer = s.findSerializerFor(message)
val builder = SerializedMessage.newBuilder
val oldInfo = Serialization.currentTransportInformation.value
try {
if (oldInfo eq null)
Serialization.currentTransportInformation.value = system.provider.serializationInformation
builder.setMessage(ByteString.copyFrom(serializer.toBinary(message)))
builder.setSerializerId(serializer.identifier)
@ -55,21 +60,27 @@ private[akka] object MessageSerializer {
case NonFatal(e)
throw new SerializationException(s"Failed to serialize remote message [${message.getClass}] " +
s"using serializer [${serializer.getClass}].", e)
}
} finally Serialization.currentTransportInformation.value = oldInfo
}
def serializeForArtery(serialization: Serialization, outboundEnvelope: OutboundEnvelope, headerBuilder: HeaderBuilder, envelope: EnvelopeBuffer): Unit = {
val message = outboundEnvelope.message
val serializer = serialization.findSerializerFor(message)
val oldInfo = Serialization.currentTransportInformation.value
try {
if (oldInfo eq null)
Serialization.currentTransportInformation.value = serialization.serializationInformation
headerBuilder setSerializer serializer.identifier
headerBuilder setManifest Serializers.manifestFor(serializer, message)
envelope.writeHeader(headerBuilder, outboundEnvelope)
headerBuilder.setSerializer(serializer.identifier)
headerBuilder.setManifest(Serializers.manifestFor(serializer, message))
envelope.writeHeader(headerBuilder, outboundEnvelope)
serializer match {
case ser: ByteBufferSerializer ser.toBinary(message, envelope.byteBuffer)
case _ envelope.byteBuffer.put(serializer.toBinary(message))
}
serializer match {
case ser: ByteBufferSerializer ser.toBinary(message, envelope.byteBuffer)
case _ envelope.byteBuffer.put(serializer.toBinary(message))
}
} finally Serialization.currentTransportInformation.value = oldInfo
}
def deserializeForArtery(system: ExtendedActorSystem, originUid: Long, serialization: Serialization,

View file

@ -10,11 +10,9 @@ import akka.dispatch.sysmsg._
import akka.event.{ EventStream, Logging, LoggingAdapter }
import akka.event.Logging.Error
import akka.pattern.pipe
import scala.util.control.NonFatal
import akka.actor.SystemGuardian.{ RegisterTerminationHook, TerminationHook, TerminationHookDone }
import scala.util.control.Exception.Catcher
import scala.concurrent.Future
@ -29,6 +27,7 @@ import akka.remote.artery.OutboundEnvelope
import akka.remote.artery.SystemMessageDelivery.SystemMessageEnvelope
import akka.remote.serialization.ActorRefResolveThreadLocalCache
import akka.remote.artery.tcp.ArteryTcpTransport
import akka.serialization.Serialization
/**
* INTERNAL API
@ -475,6 +474,21 @@ private[akka] class RemoteActorRefProvider(
def getDefaultAddress: Address = transport.defaultAddress
// no need for volatile, only intended as cached value, not necessarily a singleton value
private var serializationInformationCache: OptionVal[Serialization.Information] = OptionVal.None
@InternalApi override private[akka] def serializationInformation: Serialization.Information =
serializationInformationCache match {
case OptionVal.Some(info) info
case OptionVal.None
if ((transport eq null) || (transport.defaultAddress eq null))
local.serializationInformation // address not know yet, access before complete init and binding
else {
val info = Serialization.Information(transport.defaultAddress, transport.system)
serializationInformationCache = OptionVal.Some(info)
info
}
}
private def hasAddress(address: Address): Boolean =
address == local.rootPath.address || address == rootPath.address || transport.addresses(address)
@ -508,6 +522,9 @@ private[akka] class RemoteActorRef private[akka] (
deploy: Option[Deploy])
extends InternalActorRef with RemoteRef {
if (path.address.hasLocalScope)
throw new IllegalArgumentException(s"Unexpected local address in RemoteActorRef [$this]")
remote match {
case t: ArteryTransport
// detect mistakes such as using "akka.tcp" with Artery

View file

@ -340,7 +340,7 @@ private[remote] abstract class ArteryTransport(_system: ExtendedActorSystem, _pr
def bindAddress: UniqueAddress = _bindAddress
override def localAddress: UniqueAddress = _localAddress
override def defaultAddress: Address = localAddress.address
override def defaultAddress: Address = if (_localAddress eq null) null else localAddress.address
override def addresses: Set[Address] = _addresses
override def localAddressForRemote(remote: Address): Address = defaultAddress

View file

@ -130,6 +130,8 @@ private[remote] class Association(
import Association._
import FlightRecorderEvents._
require(remoteAddress.port.nonEmpty)
private val log = Logging(transport.system, getClass.getName)
private def flightRecorder = transport.topLevelFlightRecorder

View file

@ -57,10 +57,9 @@ private[remote] class Encoder(
val logic = new GraphStageLogic(shape) with InHandler with OutHandler with StageLogging with OutboundCompressionAccess {
private val headerBuilder = HeaderBuilder.out()
headerBuilder setVersion version
headerBuilder setUid uniqueLocalAddress.uid
headerBuilder.setVersion(version)
headerBuilder.setUid(uniqueLocalAddress.uid)
private val localAddress = uniqueLocalAddress.address
private val serializationInfo = Serialization.Information(localAddress, system)
// lazy init of SerializationExtension to avoid loading serializers before ActorRefProvider has been initialized
private var _serialization: OptionVal[Serialization] = OptionVal.None
@ -104,34 +103,34 @@ private[remote] class Encoder(
// without depending on compression tables being in sync when systems are restarted
headerBuilder.useOutboundCompression(!outboundEnvelope.message.isInstanceOf[ArteryMessage])
// internally compression is applied by the builder:
outboundEnvelope.recipient match {
case OptionVal.Some(r) headerBuilder setRecipientActorRef r
case OptionVal.None headerBuilder.setNoRecipient()
}
// Important to set Serialization.currentTransportInformation because setRecipientActorRef
// and setSenderActorRef are using using Serialization.serializedActorPath.
// Avoiding currentTransportInformation.withValue due to thunk allocation.
val oldInfo = Serialization.currentTransportInformation.value
try {
// avoiding currentTransportInformation.withValue due to thunk allocation
val oldValue = Serialization.currentTransportInformation.value
try {
Serialization.currentTransportInformation.value = serializationInfo
Serialization.currentTransportInformation.value = serialization.serializationInformation
outboundEnvelope.sender match {
case OptionVal.None headerBuilder.setNoSender()
case OptionVal.Some(s) headerBuilder setSenderActorRef s
}
// internally compression is applied by the builder:
outboundEnvelope.recipient match {
case OptionVal.Some(r) headerBuilder.setRecipientActorRef(r)
case OptionVal.None headerBuilder.setNoRecipient()
}
val startTime: Long = if (instruments.timeSerialization) System.nanoTime else 0
if (instruments.nonEmpty)
headerBuilder.setRemoteInstruments(instruments)
outboundEnvelope.sender match {
case OptionVal.None headerBuilder.setNoSender()
case OptionVal.Some(s) headerBuilder.setSenderActorRef(s)
}
MessageSerializer.serializeForArtery(serialization, outboundEnvelope, headerBuilder, envelope)
val startTime: Long = if (instruments.timeSerialization) System.nanoTime else 0
if (instruments.nonEmpty)
headerBuilder.setRemoteInstruments(instruments)
if (instruments.nonEmpty) {
val time = if (instruments.timeSerialization) System.nanoTime - startTime else 0
instruments.messageSent(outboundEnvelope, envelope.byteBuffer.position(), time)
}
} finally Serialization.currentTransportInformation.value = oldValue
MessageSerializer.serializeForArtery(serialization, outboundEnvelope, headerBuilder, envelope)
if (instruments.nonEmpty) {
val time = if (instruments.timeSerialization) System.nanoTime - startTime else 0
instruments.messageSent(outboundEnvelope, envelope.byteBuffer.position(), time)
}
envelope.byteBuffer.flip()
@ -162,6 +161,7 @@ private[remote] class Encoder(
pull(in)
}
} finally {
Serialization.currentTransportInformation.value = oldInfo
outboundEnvelope match {
case r: ReusableOutboundEnvelope outboundEnvelopePool.release(r)
case _ // no need to release it

View file

@ -198,7 +198,7 @@ private[remote] sealed trait HeaderBuilder {
private[remote] final class SerializationFormatCache
extends LruBoundedCache[ActorRef, String](capacity = 1024, evictAgeThreshold = 600) {
override protected def compute(ref: ActorRef): String = ref.path.toSerializationFormat
override protected def compute(ref: ActorRef): String = Serialization.serializedActorPath(ref)
// Not calling ref.hashCode since it does a path.hashCode if ActorCell.undefinedUid is encountered.
// Refs with ActorCell.undefinedUid will now collide all the time, but this is not a usual scenario anyway.
@ -285,8 +285,11 @@ private[remote] final class HeaderBuilderImpl(
}
def outboundClassManifestCompression: CompressionTable[String] = _outboundClassManifestCompression
/**
* Note that Serialization.currentTransportInformation must be set when calling this method,
* because it's using `Serialization.serializedActorPath`
*/
override def setSenderActorRef(ref: ActorRef): Unit = {
// serializedActorPath includes local address from `currentTransportInformation`
if (_useOutboundCompression) {
_senderActorRefIdx = outboundActorRefCompression.compress(ref)
if (_senderActorRefIdx == -1) _senderActorRef = Serialization.serializedActorPath(ref)
@ -316,6 +319,10 @@ private[remote] final class HeaderBuilderImpl(
def isNoRecipient: Boolean =
(_recipientActorRef eq null) && _recipientActorRefIdx == DeadLettersCode
/**
* Note that Serialization.currentTransportInformation must be set when calling this method,
* because it's using `Serialization.serializedActorPath`
*/
def setRecipientActorRef(ref: ActorRef): Unit = {
if (_useOutboundCompression) {
_recipientActorRefIdx = outboundActorRefCompression.compress(ref)

View file

@ -0,0 +1,10 @@
/**
* Copyright (C) 2018 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.remote.artery
import akka.remote.serialization.AbstractSerializationTransportInformationSpec
class SerializationTransportInformationSpec extends AbstractSerializationTransportInformationSpec(
ArterySpecSupport.defaultConfig)

View file

@ -0,0 +1,135 @@
/**
* Copyright (C) 2018 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.remote.serialization
import java.nio.charset.StandardCharsets
import akka.actor.ActorIdentity
import akka.serialization.Serialization
import akka.actor.ActorRef
import akka.actor.ActorSystem
import akka.actor.ExtendedActorSystem
import akka.actor.Identify
import akka.actor.RootActorPath
import akka.remote.RARP
import akka.serialization.SerializerWithStringManifest
import akka.testkit.AkkaSpec
import akka.testkit.ImplicitSender
import akka.testkit.TestActors
import com.typesafe.config.Config
import com.typesafe.config.ConfigFactory
object SerializationTransportInformationSpec {
final case class TestMessage(from: ActorRef, to: ActorRef)
final case class JavaSerTestMessage(from: ActorRef, to: ActorRef)
class TestSerializer(system: ExtendedActorSystem) extends SerializerWithStringManifest {
def identifier: Int = 666
def manifest(o: AnyRef): String = o match {
case _: TestMessage "A"
}
def toBinary(o: AnyRef): Array[Byte] = o match {
case TestMessage(from, to)
verifyTransportInfo()
val fromStr = Serialization.serializedActorPath(from)
val toStr = Serialization.serializedActorPath(to)
s"$fromStr,$toStr".getBytes(StandardCharsets.UTF_8)
}
def fromBinary(bytes: Array[Byte], manifest: String): AnyRef = {
verifyTransportInfo()
manifest match {
case "A"
val parts = new String(bytes, StandardCharsets.UTF_8).split(',')
val fromStr = parts(0)
val toStr = parts(1)
val from = system.provider.resolveActorRef(fromStr)
val to = system.provider.resolveActorRef(toStr)
TestMessage(from, to)
}
}
private def verifyTransportInfo(): Unit = {
Serialization.currentTransportInformation.value match {
case null
throw new IllegalStateException("currentTransportInformation was not set")
case t
if (t.system ne system)
throw new IllegalStateException(
s"wrong system in currentTransportInformation, ${t.system} != $system")
if (t.address != system.provider.getDefaultAddress)
throw new IllegalStateException(
s"wrong address in currentTransportInformation, ${t.address} != ${system.provider.getDefaultAddress}")
}
}
}
}
abstract class AbstractSerializationTransportInformationSpec(config: Config) extends AkkaSpec(
config.withFallback(ConfigFactory.parseString(
"""
akka {
loglevel = info
actor {
provider = remote
warn-about-java-serializer-usage = off
serialize-creators = off
serializers {
test = "akka.remote.serialization.SerializationTransportInformationSpec$TestSerializer"
}
serialization-bindings {
"akka.remote.serialization.SerializationTransportInformationSpec$TestMessage" = test
"akka.remote.serialization.SerializationTransportInformationSpec$JavaSerTestMessage" = java
}
}
}
"""))) with ImplicitSender {
import SerializationTransportInformationSpec._
val port = system.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress.port.get
val sysName = system.name
val protocol =
if (RARP(system).provider.remoteSettings.Artery.Enabled) "akka"
else "akka.tcp"
val system2 = ActorSystem(system.name, system.settings.config)
val system2Address = RARP(system2).provider.getDefaultAddress
"Serialization of ActorRef in remote message" must {
"resolve address" in {
system2.actorOf(TestActors.echoActorProps, "echo")
val echoSel = system.actorSelection(RootActorPath(system2Address) / "user" / "echo")
echoSel ! Identify(1)
val echo = expectMsgType[ActorIdentity].ref.get
echo ! TestMessage(testActor, echo)
expectMsg(TestMessage(testActor, echo))
echo ! JavaSerTestMessage(testActor, echo)
expectMsg(JavaSerTestMessage(testActor, echo))
echo ! testActor
expectMsg(testActor)
echo ! echo
expectMsg(echo)
}
}
override def afterTermination(): Unit = {
shutdown(system2)
}
}
class SerializationTransportInformationSpec extends AbstractSerializationTransportInformationSpec(ConfigFactory.parseString("""
akka.remote.netty.tcp {
hostname = localhost
port = 0
}
"""))

View file

@ -218,7 +218,7 @@ private[stream] final class SourceRefStageImpl[Out](
} // else, ref is valid and we don't need to do anything with it
}
/** @throws InvalidSequenceNumberException when sequence number is is invalid */
/** @throws InvalidSequenceNumberException when sequence number is invalid */
def observeAndValidateSequenceNr(seqNr: Long, msg: String): Unit =
if (isInvalidSequenceNr(seqNr)) {
throw InvalidSequenceNumberException(expectingSeqNr, seqNr, msg)