+all #16632 Make serialization identifiers configurable in reference.conf

This commit is contained in:
Andrei Pozolotin 2015-03-05 11:55:05 -06:00
parent 064eea6180
commit 6332f888ce
17 changed files with 76 additions and 44 deletions

View file

@ -493,6 +493,17 @@ akka {
"java.io.Serializable" = java "java.io.Serializable" = java
} }
# Configuration namespace of serialization identifiers.
# Each serializer implementation must have an entry in the following format:
# `akka.actor.serialization-identifiers."FQCN" = ID`
# where `FQCN` is fully qualified class name of the serializer implementation
# and `ID` is globally unique serializer identifier number.
# Identifier values from 0 to 16 are reserved for Akka internal usage.
serialization-identifiers {
"akka.serialization.JavaSerializer" = 1
"akka.serialization.ByteArraySerializer" = 4
}
# Configuration items which are used by the akka.actor.ActorDSL._ methods # Configuration items which are used by the akka.actor.ActorDSL._ methods
dsl { dsl {
# Maximum queue size of the actor created by newInbox(); this protects # Maximum queue size of the actor created by newInbox(); this protects

View file

@ -33,8 +33,8 @@ import akka.serialization.JavaSerializer.CurrentSystem
trait Serializer { trait Serializer {
/** /**
* Completely unique value to identify this implementation of Serializer, used to optimize network traffic * Completely unique value to identify this implementation of Serializer, used to optimize network traffic.
* Values from 0 to 16 is reserved for Akka internal usage * Values from 0 to 16 are reserved for Akka internal usage.
*/ */
def identifier: Int def identifier: Int
@ -65,6 +65,33 @@ trait Serializer {
final def fromBinary(bytes: Array[Byte], clazz: Class[_]): AnyRef = fromBinary(bytes, Option(clazz)) final def fromBinary(bytes: Array[Byte], clazz: Class[_]): AnyRef = fromBinary(bytes, Option(clazz))
} }
/**
* Base serializer trait with serialization identifiers configuration contract,
* when globally unique serialization identifier is configured in the `reference.conf`.
*/
trait BaseSerializer extends Serializer {
/**
* Actor system which is required by most serializer implementations.
*/
val system: ExtendedActorSystem
/**
* Configuration namespace of serialization identifiers in the `reference.conf`.
*
* Each serializer implementation must have an entry in the following format:
* `akka.actor.serialization-identifiers."FQCN" = ID`
* where `FQCN` is fully qualified class name of the serializer implementation
* and `ID` is globally unique serializer identifier number.
*/
final val SerializationIdentifiers = "akka.actor.serialization-identifiers"
/**
* Globally unique serialization identifier configured in the `reference.conf`.
*
* See [[Serializer#identifier()]].
*/
final override val identifier: Int =
system.settings.config.getInt(s"""${SerializationIdentifiers}."${getClass.getName}"""")
}
/** /**
* Java API for creating a Serializer: make sure to include a constructor which * Java API for creating a Serializer: make sure to include a constructor which
* takes exactly one argument of type [[akka.actor.ExtendedActorSystem]], because * takes exactly one argument of type [[akka.actor.ExtendedActorSystem]], because
@ -117,12 +144,10 @@ object JavaSerializer {
/** /**
* This Serializer uses standard Java Serialization * This Serializer uses standard Java Serialization
*/ */
class JavaSerializer(val system: ExtendedActorSystem) extends Serializer { class JavaSerializer(val system: ExtendedActorSystem) extends BaseSerializer {
def includeManifest: Boolean = false def includeManifest: Boolean = false
def identifier = 1
def toBinary(o: AnyRef): Array[Byte] = { def toBinary(o: AnyRef): Array[Byte] = {
val bos = new ByteArrayOutputStream val bos = new ByteArrayOutputStream
val out = new ObjectOutputStream(bos) val out = new ObjectOutputStream(bos)
@ -154,9 +179,8 @@ class NullSerializer extends Serializer {
* This is a special Serializer that Serializes and deserializes byte arrays only, * This is a special Serializer that Serializes and deserializes byte arrays only,
* (just returns the byte array unchanged/uncopied) * (just returns the byte array unchanged/uncopied)
*/ */
class ByteArraySerializer extends Serializer { class ByteArraySerializer(val system: ExtendedActorSystem) extends BaseSerializer {
def includeManifest: Boolean = false def includeManifest: Boolean = false
def identifier = 4
def toBinary(o: AnyRef) = o match { def toBinary(o: AnyRef) = o match {
case null null case null null
case o: Array[Byte] o case o: Array[Byte] o

View file

@ -29,8 +29,6 @@ akka.cluster.metrics {
# Sigar native library extract location. # Sigar native library extract location.
# Use per-application-instance scoped location, such as program working directory. # Use per-application-instance scoped location, such as program working directory.
native-library-extract-folder = ${user.dir}"/native" native-library-extract-folder = ${user.dir}"/native"
# Unique serialization identifier. Must not conflict with any other in an akka system.
serializer-identifier = 10
# Metrics supervisor actor. # Metrics supervisor actor.
supervisor { supervisor {
# Actor name. Example name space: /system/cluster-metrics # Actor name. Example name space: /system/cluster-metrics
@ -97,6 +95,10 @@ akka.actor {
serialization-bindings { serialization-bindings {
"akka.cluster.metrics.ClusterMetricsMessage" = akka-cluster-metrics "akka.cluster.metrics.ClusterMetricsMessage" = akka-cluster-metrics
} }
# Globally unique metrics extension serializer identifier.
serialization-identifiers {
"akka.cluster.metrics.protobuf.MessageSerializer" = 10
}
# Provide routing of messages based on cluster metrics. # Provide routing of messages based on cluster metrics.
router.type-mapping { router.type-mapping {
cluster-metrics-adaptive-pool = "akka.cluster.metrics.AdaptiveLoadBalancingPool" cluster-metrics-adaptive-pool = "akka.cluster.metrics.AdaptiveLoadBalancingPool"

View file

@ -25,7 +25,6 @@ case class ClusterMetricsSettings(config: Config) {
} }
val PeriodicTasksInitialDelay: FiniteDuration = cc.getMillisDuration("periodic-tasks-initial-delay") val PeriodicTasksInitialDelay: FiniteDuration = cc.getMillisDuration("periodic-tasks-initial-delay")
val NativeLibraryExtractFolder: String = cc.getString("native-library-extract-folder") val NativeLibraryExtractFolder: String = cc.getString("native-library-extract-folder")
val SerializerIdentifier: Int = cc.getInt("serializer-identifier")
// Supervisor. // Supervisor.
val SupervisorName: String = cc.getString("supervisor.name") val SupervisorName: String = cc.getString("supervisor.name")

View file

@ -11,7 +11,7 @@ import java.{ lang ⇒ jl }
import akka.actor.{ Address, ExtendedActorSystem } import akka.actor.{ Address, ExtendedActorSystem }
import akka.cluster.metrics.protobuf.msg.{ ClusterMetricsMessages cm } import akka.cluster.metrics.protobuf.msg.{ ClusterMetricsMessages cm }
import akka.cluster.metrics.{ ClusterMetricsMessage, ClusterMetricsSettings, EWMA, Metric, MetricsGossip, MetricsGossipEnvelope, NodeMetrics } import akka.cluster.metrics.{ ClusterMetricsMessage, ClusterMetricsSettings, EWMA, Metric, MetricsGossip, MetricsGossipEnvelope, NodeMetrics }
import akka.serialization.Serializer import akka.serialization.BaseSerializer
import akka.util.ClassLoaderObjectInputStream import akka.util.ClassLoaderObjectInputStream
import com.google.protobuf.{ ByteString, MessageLite } import com.google.protobuf.{ ByteString, MessageLite }
@ -21,7 +21,7 @@ import scala.collection.JavaConverters.{ asJavaIterableConverter, asScalaBufferC
/** /**
* Protobuf serializer for [[ClusterMetricsMessage]] types. * Protobuf serializer for [[ClusterMetricsMessage]] types.
*/ */
class MessageSerializer(val system: ExtendedActorSystem) extends Serializer { class MessageSerializer(val system: ExtendedActorSystem) extends BaseSerializer {
private final val BufferSize = 4 * 1024 private final val BufferSize = 4 * 1024
@ -30,8 +30,6 @@ class MessageSerializer(val system: ExtendedActorSystem) extends Serializer {
override val includeManifest: Boolean = true override val includeManifest: Boolean = true
override val identifier = ClusterMetricsSettings(system.settings.config).SerializerIdentifier
override def toBinary(obj: AnyRef): Array[Byte] = obj match { override def toBinary(obj: AnyRef): Array[Byte] = obj match {
case m: MetricsGossipEnvelope case m: MetricsGossipEnvelope
compress(metricsGossipEnvelopeToProto(m)) compress(metricsGossipEnvelopeToProto(m))

View file

@ -25,7 +25,6 @@ class ClusterMetricsSettingsSpec extends AkkaSpec {
MetricsDispatcher should be(Dispatchers.DefaultDispatcherId) MetricsDispatcher should be(Dispatchers.DefaultDispatcherId)
PeriodicTasksInitialDelay should be(1 second) PeriodicTasksInitialDelay should be(1 second)
NativeLibraryExtractFolder should be(System.getProperty("user.dir") + "/native") NativeLibraryExtractFolder should be(System.getProperty("user.dir") + "/native")
SerializerIdentifier should be(10)
// Supervisor. // Supervisor.
SupervisorName should be("cluster-metrics") SupervisorName should be("cluster-metrics")

View file

@ -226,6 +226,10 @@ akka {
"akka.cluster.ClusterMessage" = akka-cluster "akka.cluster.ClusterMessage" = akka-cluster
} }
serialization-identifiers {
"akka.cluster.protobuf.ClusterMessageSerializer" = 5
}
router.type-mapping { router.type-mapping {
adaptive-pool = "akka.cluster.routing.AdaptiveLoadBalancingPool" adaptive-pool = "akka.cluster.routing.AdaptiveLoadBalancingPool"
adaptive-group = "akka.cluster.routing.AdaptiveLoadBalancingGroup" adaptive-group = "akka.cluster.routing.AdaptiveLoadBalancingGroup"

View file

@ -10,7 +10,7 @@ import java.{ lang ⇒ jl }
import akka.actor.{ Address, ExtendedActorSystem } import akka.actor.{ Address, ExtendedActorSystem }
import akka.cluster._ import akka.cluster._
import akka.cluster.protobuf.msg.{ ClusterMessages cm } import akka.cluster.protobuf.msg.{ ClusterMessages cm }
import akka.serialization.Serializer import akka.serialization.BaseSerializer
import akka.util.ClassLoaderObjectInputStream import akka.util.ClassLoaderObjectInputStream
import com.google.protobuf.{ ByteString, MessageLite } import com.google.protobuf.{ ByteString, MessageLite }
@ -22,7 +22,7 @@ import scala.concurrent.duration.Deadline
/** /**
* Protobuf serializer of cluster messages. * Protobuf serializer of cluster messages.
*/ */
class ClusterMessageSerializer(val system: ExtendedActorSystem) extends Serializer { class ClusterMessageSerializer(val system: ExtendedActorSystem) extends BaseSerializer {
private final val BufferSize = 1024 * 4 private final val BufferSize = 1024 * 4
// must be lazy because serializer is initialized from Cluster extension constructor // must be lazy because serializer is initialized from Cluster extension constructor
@ -53,8 +53,6 @@ class ClusterMessageSerializer(val system: ExtendedActorSystem) extends Serializ
def includeManifest: Boolean = true def includeManifest: Boolean = true
def identifier = 5
def toBinary(obj: AnyRef): Array[Byte] = obj match { def toBinary(obj: AnyRef): Array[Byte] = obj match {
case ClusterHeartbeatSender.Heartbeat(from) addressToProtoByteArray(from) case ClusterHeartbeatSender.Heartbeat(from) addressToProtoByteArray(from)
case ClusterHeartbeatSender.HeartbeatRsp(from) uniqueAddressToProtoByteArray(from) case ClusterHeartbeatSender.HeartbeatRsp(from) uniqueAddressToProtoByteArray(from)

View file

@ -40,6 +40,9 @@ akka.actor {
serialization-bindings { serialization-bindings {
"akka.contrib.pattern.DistributedPubSubMessage" = akka-pubsub "akka.contrib.pattern.DistributedPubSubMessage" = akka-pubsub
} }
serialization-identifiers {
"akka.contrib.pattern.protobuf.DistributedPubSubMessageSerializer" = 9
}
} }

View file

@ -3,7 +3,7 @@
*/ */
package akka.contrib.pattern.protobuf package akka.contrib.pattern.protobuf
import akka.serialization.Serializer import akka.serialization.BaseSerializer
import akka.cluster._ import akka.cluster._
import scala.collection.breakOut import scala.collection.breakOut
import akka.actor.{ ExtendedActorSystem, Address } import akka.actor.{ ExtendedActorSystem, Address }
@ -31,7 +31,7 @@ import scala.collection.immutable.TreeMap
/** /**
* Protobuf serializer of DistributedPubSubMediator messages. * Protobuf serializer of DistributedPubSubMediator messages.
*/ */
class DistributedPubSubMessageSerializer(val system: ExtendedActorSystem) extends Serializer { class DistributedPubSubMessageSerializer(val system: ExtendedActorSystem) extends BaseSerializer {
private final val BufferSize = 1024 * 4 private final val BufferSize = 1024 * 4
@ -44,8 +44,6 @@ class DistributedPubSubMessageSerializer(val system: ExtendedActorSystem) extend
def includeManifest: Boolean = true def includeManifest: Boolean = true
def identifier = 9
def toBinary(obj: AnyRef): Array[Byte] = obj match { def toBinary(obj: AnyRef): Array[Byte] = obj match {
case m: Status compress(statusToProto(m)) case m: Status compress(statusToProto(m))
case m: Delta compress(deltaToProto(m)) case m: Delta compress(deltaToProto(m))

View file

@ -23,15 +23,13 @@ trait Message extends Serializable
/** /**
* Protobuf serializer for [[PersistentRepr]] and [[AtLeastOnceDelivery]] messages. * Protobuf serializer for [[PersistentRepr]] and [[AtLeastOnceDelivery]] messages.
*/ */
class MessageSerializer(val system: ExtendedActorSystem) extends Serializer { class MessageSerializer(val system: ExtendedActorSystem) extends BaseSerializer {
import PersistentRepr.Undefined import PersistentRepr.Undefined
val PersistentReprClass = classOf[PersistentRepr] val PersistentReprClass = classOf[PersistentRepr]
val PersistentImplClass = classOf[PersistentImpl] val PersistentImplClass = classOf[PersistentImpl]
val AtLeastOnceDeliverySnapshotClass = classOf[AtLeastOnceDeliverySnap] val AtLeastOnceDeliverySnapshotClass = classOf[AtLeastOnceDeliverySnap]
val SerializationIdentifiers = "akka.actor.serialization-identifiers" // TODO move to [[Serializer]]
override val identifier: Int = system.settings.config.getInt(s"""${SerializationIdentifiers}."${getClass.getName}"""")
override val includeManifest: Boolean = true override val includeManifest: Boolean = true
private lazy val transportInformation: Option[Serialization.Information] = { private lazy val transportInformation: Option[Serialization.Information] = {

View file

@ -7,8 +7,7 @@ package akka.persistence.serialization
import java.io._ import java.io._
import akka.actor._ import akka.actor._
import akka.serialization.{ Serializer, SerializationExtension } import akka.serialization._
import akka.serialization.Serialization
import scala.util.Success import scala.util.Success
import scala.util.Failure import scala.util.Failure
@ -30,10 +29,8 @@ private[serialization] final case class SnapshotHeader(serializerId: Int, manife
/** /**
* [[Snapshot]] serializer. * [[Snapshot]] serializer.
*/ */
class SnapshotSerializer(system: ExtendedActorSystem) extends Serializer { class SnapshotSerializer(val system: ExtendedActorSystem) extends BaseSerializer {
val SerializationIdentifiers = "akka.actor.serialization-identifiers" // TODO move to [[Serializer]]
override val identifier: Int = system.settings.config.getInt(s"""${SerializationIdentifiers}."${getClass.getName}"""")
override val includeManifest: Boolean = false override val includeManifest: Boolean = false
private lazy val transportInformation: Option[Serialization.Information] = { private lazy val transportInformation: Option[Serialization.Information] = {

View file

@ -18,7 +18,6 @@ akka {
daemon-create = "akka.remote.serialization.DaemonMsgCreateSerializer" daemon-create = "akka.remote.serialization.DaemonMsgCreateSerializer"
} }
serialization-bindings { serialization-bindings {
# Since com.google.protobuf.Message does not extend Serializable but # Since com.google.protobuf.Message does not extend Serializable but
# GeneratedMessage does, need to use the more specific one here in order # GeneratedMessage does, need to use the more specific one here in order
@ -28,6 +27,12 @@ akka {
"akka.remote.DaemonMsgCreate" = daemon-create "akka.remote.DaemonMsgCreate" = daemon-create
} }
serialization-identifiers {
"akka.remote.serialization.ProtobufSerializer" = 2
"akka.remote.serialization.DaemonMsgCreateSerializer" = 3
"akka.remote.serialization.MessageContainerSerializer" = 6
}
deployment { deployment {
default { default {

View file

@ -4,7 +4,7 @@
package akka.remote.serialization package akka.remote.serialization
import akka.serialization.{ Serializer, SerializationExtension } import akka.serialization.{ BaseSerializer, SerializationExtension }
import java.io.Serializable import java.io.Serializable
import com.google.protobuf.ByteString import com.google.protobuf.ByteString
import com.typesafe.config.{ Config, ConfigFactory } import com.typesafe.config.{ Config, ConfigFactory }
@ -24,13 +24,13 @@ import util.{ Failure, Success }
* *
* INTERNAL API * INTERNAL API
*/ */
private[akka] class DaemonMsgCreateSerializer(val system: ExtendedActorSystem) extends Serializer { private[akka] class DaemonMsgCreateSerializer(val system: ExtendedActorSystem) extends BaseSerializer {
import ProtobufSerializer.serializeActorRef import ProtobufSerializer.serializeActorRef
import ProtobufSerializer.deserializeActorRef import ProtobufSerializer.deserializeActorRef
import Deploy.NoDispatcherGiven import Deploy.NoDispatcherGiven
def includeManifest: Boolean = false def includeManifest: Boolean = false
def identifier = 3
lazy val serialization = SerializationExtension(system) lazy val serialization = SerializationExtension(system)
def toBinary(obj: AnyRef): Array[Byte] = obj match { def toBinary(obj: AnyRef): Array[Byte] = obj match {

View file

@ -13,11 +13,9 @@ import akka.actor.SelectParent
import akka.actor.SelectionPathElement import akka.actor.SelectionPathElement
import akka.remote.ContainerFormats import akka.remote.ContainerFormats
import akka.serialization.SerializationExtension import akka.serialization.SerializationExtension
import akka.serialization.Serializer import akka.serialization.BaseSerializer
class MessageContainerSerializer(val system: ExtendedActorSystem) extends Serializer { class MessageContainerSerializer(val system: ExtendedActorSystem) extends BaseSerializer {
def identifier: Int = 6
def includeManifest: Boolean = false def includeManifest: Boolean = false

View file

@ -9,7 +9,7 @@ import java.util.concurrent.atomic.AtomicReference
import akka.actor.{ ActorRef, ExtendedActorSystem } import akka.actor.{ ActorRef, ExtendedActorSystem }
import akka.remote.WireFormats.ActorRefData import akka.remote.WireFormats.ActorRefData
import akka.serialization.{ Serialization, Serializer } import akka.serialization.{ Serialization, BaseSerializer }
import com.google.protobuf.Message import com.google.protobuf.Message
import scala.annotation.tailrec import scala.annotation.tailrec
@ -37,12 +37,10 @@ object ProtobufSerializer {
/** /**
* This Serializer serializes `com.google.protobuf.Message`s * This Serializer serializes `com.google.protobuf.Message`s
*/ */
class ProtobufSerializer extends Serializer { class ProtobufSerializer(val system: ExtendedActorSystem) extends BaseSerializer {
private val parsingMethodBindingRef = new AtomicReference[Map[Class[_], Method]](Map.empty) private val parsingMethodBindingRef = new AtomicReference[Map[Class[_], Method]](Map.empty)
override def identifier: Int = 2
override def includeManifest: Boolean = true override def includeManifest: Boolean = true
override def fromBinary(bytes: Array[Byte], manifest: Option[Class[_]]): AnyRef = { override def fromBinary(bytes: Array[Byte], manifest: Option[Class[_]]): AnyRef = {