diff --git a/akka-actor/src/main/resources/reference.conf b/akka-actor/src/main/resources/reference.conf index 198eaf8846..9d1e6fa1d1 100644 --- a/akka-actor/src/main/resources/reference.conf +++ b/akka-actor/src/main/resources/reference.conf @@ -492,6 +492,17 @@ akka { "[B" = bytes "java.io.Serializable" = java } + + # Configuration namespace of serialization identifiers. + # Each serializer implementation must have an entry in the following format: + # `akka.actor.serialization-identifiers."FQCN" = ID` + # where `FQCN` is fully qualified class name of the serializer implementation + # and `ID` is globally unique serializer identifier number. + # Identifier values from 0 to 16 are reserved for Akka internal usage. + serialization-identifiers { + "akka.serialization.JavaSerializer" = 1 + "akka.serialization.ByteArraySerializer" = 4 + } # Configuration items which are used by the akka.actor.ActorDSL._ methods dsl { diff --git a/akka-actor/src/main/scala/akka/serialization/Serializer.scala b/akka-actor/src/main/scala/akka/serialization/Serializer.scala index 622df661b2..14b23a605d 100644 --- a/akka-actor/src/main/scala/akka/serialization/Serializer.scala +++ b/akka-actor/src/main/scala/akka/serialization/Serializer.scala @@ -33,8 +33,8 @@ import akka.serialization.JavaSerializer.CurrentSystem trait Serializer { /** - * Completely unique value to identify this implementation of Serializer, used to optimize network traffic - * Values from 0 to 16 is reserved for Akka internal usage + * Completely unique value to identify this implementation of Serializer, used to optimize network traffic. + * Values from 0 to 16 are reserved for Akka internal usage. */ def identifier: Int @@ -65,6 +65,33 @@ trait Serializer { final def fromBinary(bytes: Array[Byte], clazz: Class[_]): AnyRef = fromBinary(bytes, Option(clazz)) } +/** + * Base serializer trait with serialization identifiers configuration contract, + * when globally unique serialization identifier is configured in the `reference.conf`. + */ +trait BaseSerializer extends Serializer { + /** + * Actor system which is required by most serializer implementations. + */ + val system: ExtendedActorSystem + /** + * Configuration namespace of serialization identifiers in the `reference.conf`. + * + * Each serializer implementation must have an entry in the following format: + * `akka.actor.serialization-identifiers."FQCN" = ID` + * where `FQCN` is fully qualified class name of the serializer implementation + * and `ID` is globally unique serializer identifier number. + */ + final val SerializationIdentifiers = "akka.actor.serialization-identifiers" + /** + * Globally unique serialization identifier configured in the `reference.conf`. + * + * See [[Serializer#identifier()]]. + */ + final override val identifier: Int = + system.settings.config.getInt(s"""${SerializationIdentifiers}."${getClass.getName}"""") +} + /** * Java API for creating a Serializer: make sure to include a constructor which * takes exactly one argument of type [[akka.actor.ExtendedActorSystem]], because @@ -117,12 +144,10 @@ object JavaSerializer { /** * This Serializer uses standard Java Serialization */ -class JavaSerializer(val system: ExtendedActorSystem) extends Serializer { +class JavaSerializer(val system: ExtendedActorSystem) extends BaseSerializer { def includeManifest: Boolean = false - def identifier = 1 - def toBinary(o: AnyRef): Array[Byte] = { val bos = new ByteArrayOutputStream val out = new ObjectOutputStream(bos) @@ -154,9 +179,8 @@ class NullSerializer extends Serializer { * This is a special Serializer that Serializes and deserializes byte arrays only, * (just returns the byte array unchanged/uncopied) */ -class ByteArraySerializer extends Serializer { +class ByteArraySerializer(val system: ExtendedActorSystem) extends BaseSerializer { def includeManifest: Boolean = false - def identifier = 4 def toBinary(o: AnyRef) = o match { case null ⇒ null case o: Array[Byte] ⇒ o diff --git a/akka-actor/src/main/scala/akka/util/ByteString.scala b/akka-actor/src/main/scala/akka/util/ByteString.scala index 91d7dff6e8..9e72959db9 100644 --- a/akka-actor/src/main/scala/akka/util/ByteString.scala +++ b/akka-actor/src/main/scala/akka/util/ByteString.scala @@ -66,7 +66,7 @@ object ByteString { */ @varargs def fromInts(array: Int*): ByteString = - apply(array:_*)(scala.math.Numeric.IntIsIntegral) + apply(array: _*)(scala.math.Numeric.IntIsIntegral) /** * Creates a new ByteString which will contain the UTF-8 representation of the given String diff --git a/akka-cluster-metrics/src/main/resources/reference.conf b/akka-cluster-metrics/src/main/resources/reference.conf index b00a354bb6..876746a7fb 100644 --- a/akka-cluster-metrics/src/main/resources/reference.conf +++ b/akka-cluster-metrics/src/main/resources/reference.conf @@ -29,8 +29,6 @@ akka.cluster.metrics { # Sigar native library extract location. # Use per-application-instance scoped location, such as program working directory. native-library-extract-folder = ${user.dir}"/native" - # Unique serialization identifier. Must not conflict with any other in an akka system. - serializer-identifier = 10 # Metrics supervisor actor. supervisor { # Actor name. Example name space: /system/cluster-metrics @@ -97,6 +95,10 @@ akka.actor { serialization-bindings { "akka.cluster.metrics.ClusterMetricsMessage" = akka-cluster-metrics } + # Globally unique metrics extension serializer identifier. + serialization-identifiers { + "akka.cluster.metrics.protobuf.MessageSerializer" = 10 + } # Provide routing of messages based on cluster metrics. router.type-mapping { cluster-metrics-adaptive-pool = "akka.cluster.metrics.AdaptiveLoadBalancingPool" diff --git a/akka-cluster-metrics/src/main/scala/akka/cluster/metrics/ClusterMetricsSettings.scala b/akka-cluster-metrics/src/main/scala/akka/cluster/metrics/ClusterMetricsSettings.scala index 93d06660cc..7d76dcf983 100644 --- a/akka-cluster-metrics/src/main/scala/akka/cluster/metrics/ClusterMetricsSettings.scala +++ b/akka-cluster-metrics/src/main/scala/akka/cluster/metrics/ClusterMetricsSettings.scala @@ -25,7 +25,6 @@ case class ClusterMetricsSettings(config: Config) { } val PeriodicTasksInitialDelay: FiniteDuration = cc.getMillisDuration("periodic-tasks-initial-delay") val NativeLibraryExtractFolder: String = cc.getString("native-library-extract-folder") - val SerializerIdentifier: Int = cc.getInt("serializer-identifier") // Supervisor. val SupervisorName: String = cc.getString("supervisor.name") diff --git a/akka-cluster-metrics/src/main/scala/akka/cluster/metrics/protobuf/MessageSerializer.scala b/akka-cluster-metrics/src/main/scala/akka/cluster/metrics/protobuf/MessageSerializer.scala index 661cfdc401..fb67db2ac7 100644 --- a/akka-cluster-metrics/src/main/scala/akka/cluster/metrics/protobuf/MessageSerializer.scala +++ b/akka-cluster-metrics/src/main/scala/akka/cluster/metrics/protobuf/MessageSerializer.scala @@ -11,7 +11,7 @@ import java.{ lang ⇒ jl } import akka.actor.{ Address, ExtendedActorSystem } import akka.cluster.metrics.protobuf.msg.{ ClusterMetricsMessages ⇒ cm } import akka.cluster.metrics.{ ClusterMetricsMessage, ClusterMetricsSettings, EWMA, Metric, MetricsGossip, MetricsGossipEnvelope, NodeMetrics } -import akka.serialization.Serializer +import akka.serialization.BaseSerializer import akka.util.ClassLoaderObjectInputStream import com.google.protobuf.{ ByteString, MessageLite } @@ -21,7 +21,7 @@ import scala.collection.JavaConverters.{ asJavaIterableConverter, asScalaBufferC /** * Protobuf serializer for [[ClusterMetricsMessage]] types. */ -class MessageSerializer(val system: ExtendedActorSystem) extends Serializer { +class MessageSerializer(val system: ExtendedActorSystem) extends BaseSerializer { private final val BufferSize = 4 * 1024 @@ -30,8 +30,6 @@ class MessageSerializer(val system: ExtendedActorSystem) extends Serializer { override val includeManifest: Boolean = true - override val identifier = ClusterMetricsSettings(system.settings.config).SerializerIdentifier - override def toBinary(obj: AnyRef): Array[Byte] = obj match { case m: MetricsGossipEnvelope ⇒ compress(metricsGossipEnvelopeToProto(m)) diff --git a/akka-cluster-metrics/src/test/scala/akka/cluster/metrics/ClusterMetricsSettingsSpec.scala b/akka-cluster-metrics/src/test/scala/akka/cluster/metrics/ClusterMetricsSettingsSpec.scala index 939fb1628d..723041d08f 100644 --- a/akka-cluster-metrics/src/test/scala/akka/cluster/metrics/ClusterMetricsSettingsSpec.scala +++ b/akka-cluster-metrics/src/test/scala/akka/cluster/metrics/ClusterMetricsSettingsSpec.scala @@ -25,7 +25,6 @@ class ClusterMetricsSettingsSpec extends AkkaSpec { MetricsDispatcher should be(Dispatchers.DefaultDispatcherId) PeriodicTasksInitialDelay should be(1 second) NativeLibraryExtractFolder should be(System.getProperty("user.dir") + "/native") - SerializerIdentifier should be(10) // Supervisor. SupervisorName should be("cluster-metrics") diff --git a/akka-cluster/src/main/resources/reference.conf b/akka-cluster/src/main/resources/reference.conf index bba463988a..d94ab0da5b 100644 --- a/akka-cluster/src/main/resources/reference.conf +++ b/akka-cluster/src/main/resources/reference.conf @@ -226,6 +226,10 @@ akka { "akka.cluster.ClusterMessage" = akka-cluster } + serialization-identifiers { + "akka.cluster.protobuf.ClusterMessageSerializer" = 5 + } + router.type-mapping { adaptive-pool = "akka.cluster.routing.AdaptiveLoadBalancingPool" adaptive-group = "akka.cluster.routing.AdaptiveLoadBalancingGroup" diff --git a/akka-cluster/src/main/scala/akka/cluster/protobuf/ClusterMessageSerializer.scala b/akka-cluster/src/main/scala/akka/cluster/protobuf/ClusterMessageSerializer.scala index c6de909716..ddf29b6ade 100644 --- a/akka-cluster/src/main/scala/akka/cluster/protobuf/ClusterMessageSerializer.scala +++ b/akka-cluster/src/main/scala/akka/cluster/protobuf/ClusterMessageSerializer.scala @@ -10,7 +10,7 @@ import java.{ lang ⇒ jl } import akka.actor.{ Address, ExtendedActorSystem } import akka.cluster._ import akka.cluster.protobuf.msg.{ ClusterMessages ⇒ cm } -import akka.serialization.Serializer +import akka.serialization.BaseSerializer import akka.util.ClassLoaderObjectInputStream import com.google.protobuf.{ ByteString, MessageLite } @@ -22,7 +22,7 @@ import scala.concurrent.duration.Deadline /** * Protobuf serializer of cluster messages. */ -class ClusterMessageSerializer(val system: ExtendedActorSystem) extends Serializer { +class ClusterMessageSerializer(val system: ExtendedActorSystem) extends BaseSerializer { private final val BufferSize = 1024 * 4 // must be lazy because serializer is initialized from Cluster extension constructor @@ -53,8 +53,6 @@ class ClusterMessageSerializer(val system: ExtendedActorSystem) extends Serializ def includeManifest: Boolean = true - def identifier = 5 - def toBinary(obj: AnyRef): Array[Byte] = obj match { case ClusterHeartbeatSender.Heartbeat(from) ⇒ addressToProtoByteArray(from) case ClusterHeartbeatSender.HeartbeatRsp(from) ⇒ uniqueAddressToProtoByteArray(from) diff --git a/akka-contrib/src/main/resources/reference.conf b/akka-contrib/src/main/resources/reference.conf index a858df7221..d8c942b19f 100644 --- a/akka-contrib/src/main/resources/reference.conf +++ b/akka-contrib/src/main/resources/reference.conf @@ -40,6 +40,9 @@ akka.actor { serialization-bindings { "akka.contrib.pattern.DistributedPubSubMessage" = akka-pubsub } + serialization-identifiers { + "akka.contrib.pattern.protobuf.DistributedPubSubMessageSerializer" = 9 + } } diff --git a/akka-contrib/src/main/scala/akka/contrib/pattern/protobuf/DistributedPubSubMessageSerializer.scala b/akka-contrib/src/main/scala/akka/contrib/pattern/protobuf/DistributedPubSubMessageSerializer.scala index fa9c43d0e4..3671c866ba 100644 --- a/akka-contrib/src/main/scala/akka/contrib/pattern/protobuf/DistributedPubSubMessageSerializer.scala +++ b/akka-contrib/src/main/scala/akka/contrib/pattern/protobuf/DistributedPubSubMessageSerializer.scala @@ -3,7 +3,7 @@ */ package akka.contrib.pattern.protobuf -import akka.serialization.Serializer +import akka.serialization.BaseSerializer import akka.cluster._ import scala.collection.breakOut import akka.actor.{ ExtendedActorSystem, Address } @@ -31,7 +31,7 @@ import scala.collection.immutable.TreeMap /** * Protobuf serializer of DistributedPubSubMediator messages. */ -class DistributedPubSubMessageSerializer(val system: ExtendedActorSystem) extends Serializer { +class DistributedPubSubMessageSerializer(val system: ExtendedActorSystem) extends BaseSerializer { private final val BufferSize = 1024 * 4 @@ -44,8 +44,6 @@ class DistributedPubSubMessageSerializer(val system: ExtendedActorSystem) extend def includeManifest: Boolean = true - def identifier = 9 - def toBinary(obj: AnyRef): Array[Byte] = obj match { case m: Status ⇒ compress(statusToProto(m)) case m: Delta ⇒ compress(deltaToProto(m)) diff --git a/akka-persistence/src/main/scala/akka/persistence/serialization/MessageSerializer.scala b/akka-persistence/src/main/scala/akka/persistence/serialization/MessageSerializer.scala index 7d056f1a31..b9c10950ff 100644 --- a/akka-persistence/src/main/scala/akka/persistence/serialization/MessageSerializer.scala +++ b/akka-persistence/src/main/scala/akka/persistence/serialization/MessageSerializer.scala @@ -23,15 +23,13 @@ trait Message extends Serializable /** * Protobuf serializer for [[PersistentRepr]] and [[AtLeastOnceDelivery]] messages. */ -class MessageSerializer(val system: ExtendedActorSystem) extends Serializer { +class MessageSerializer(val system: ExtendedActorSystem) extends BaseSerializer { import PersistentRepr.Undefined val PersistentReprClass = classOf[PersistentRepr] val PersistentImplClass = classOf[PersistentImpl] val AtLeastOnceDeliverySnapshotClass = classOf[AtLeastOnceDeliverySnap] - val SerializationIdentifiers = "akka.actor.serialization-identifiers" // TODO move to [[Serializer]] - override val identifier: Int = system.settings.config.getInt(s"""${SerializationIdentifiers}."${getClass.getName}"""") override val includeManifest: Boolean = true private lazy val transportInformation: Option[Serialization.Information] = { diff --git a/akka-persistence/src/main/scala/akka/persistence/serialization/SnapshotSerializer.scala b/akka-persistence/src/main/scala/akka/persistence/serialization/SnapshotSerializer.scala index 7af2ef9e85..889e8ff9e7 100644 --- a/akka-persistence/src/main/scala/akka/persistence/serialization/SnapshotSerializer.scala +++ b/akka-persistence/src/main/scala/akka/persistence/serialization/SnapshotSerializer.scala @@ -7,8 +7,7 @@ package akka.persistence.serialization import java.io._ import akka.actor._ -import akka.serialization.{ Serializer, SerializationExtension } -import akka.serialization.Serialization +import akka.serialization._ import scala.util.Success import scala.util.Failure @@ -30,10 +29,8 @@ private[serialization] final case class SnapshotHeader(serializerId: Int, manife /** * [[Snapshot]] serializer. */ -class SnapshotSerializer(system: ExtendedActorSystem) extends Serializer { +class SnapshotSerializer(val system: ExtendedActorSystem) extends BaseSerializer { - val SerializationIdentifiers = "akka.actor.serialization-identifiers" // TODO move to [[Serializer]] - override val identifier: Int = system.settings.config.getInt(s"""${SerializationIdentifiers}."${getClass.getName}"""") override val includeManifest: Boolean = false private lazy val transportInformation: Option[Serialization.Information] = { diff --git a/akka-remote/src/main/resources/reference.conf b/akka-remote/src/main/resources/reference.conf index 77b7a6d27f..85c553c78d 100644 --- a/akka-remote/src/main/resources/reference.conf +++ b/akka-remote/src/main/resources/reference.conf @@ -18,7 +18,6 @@ akka { daemon-create = "akka.remote.serialization.DaemonMsgCreateSerializer" } - serialization-bindings { # Since com.google.protobuf.Message does not extend Serializable but # GeneratedMessage does, need to use the more specific one here in order @@ -28,6 +27,12 @@ akka { "akka.remote.DaemonMsgCreate" = daemon-create } + serialization-identifiers { + "akka.remote.serialization.ProtobufSerializer" = 2 + "akka.remote.serialization.DaemonMsgCreateSerializer" = 3 + "akka.remote.serialization.MessageContainerSerializer" = 6 + } + deployment { default { diff --git a/akka-remote/src/main/scala/akka/remote/serialization/DaemonMsgCreateSerializer.scala b/akka-remote/src/main/scala/akka/remote/serialization/DaemonMsgCreateSerializer.scala index 5d28d0da70..6cea45ee17 100644 --- a/akka-remote/src/main/scala/akka/remote/serialization/DaemonMsgCreateSerializer.scala +++ b/akka-remote/src/main/scala/akka/remote/serialization/DaemonMsgCreateSerializer.scala @@ -4,7 +4,7 @@ package akka.remote.serialization -import akka.serialization.{ Serializer, SerializationExtension } +import akka.serialization.{ BaseSerializer, SerializationExtension } import java.io.Serializable import com.google.protobuf.ByteString import com.typesafe.config.{ Config, ConfigFactory } @@ -24,13 +24,13 @@ import util.{ Failure, Success } * * INTERNAL API */ -private[akka] class DaemonMsgCreateSerializer(val system: ExtendedActorSystem) extends Serializer { +private[akka] class DaemonMsgCreateSerializer(val system: ExtendedActorSystem) extends BaseSerializer { import ProtobufSerializer.serializeActorRef import ProtobufSerializer.deserializeActorRef import Deploy.NoDispatcherGiven def includeManifest: Boolean = false - def identifier = 3 + lazy val serialization = SerializationExtension(system) def toBinary(obj: AnyRef): Array[Byte] = obj match { diff --git a/akka-remote/src/main/scala/akka/remote/serialization/MessageContainerSerializer.scala b/akka-remote/src/main/scala/akka/remote/serialization/MessageContainerSerializer.scala index 0f6b82e085..56966133fe 100644 --- a/akka-remote/src/main/scala/akka/remote/serialization/MessageContainerSerializer.scala +++ b/akka-remote/src/main/scala/akka/remote/serialization/MessageContainerSerializer.scala @@ -13,11 +13,9 @@ import akka.actor.SelectParent import akka.actor.SelectionPathElement import akka.remote.ContainerFormats import akka.serialization.SerializationExtension -import akka.serialization.Serializer +import akka.serialization.BaseSerializer -class MessageContainerSerializer(val system: ExtendedActorSystem) extends Serializer { - - def identifier: Int = 6 +class MessageContainerSerializer(val system: ExtendedActorSystem) extends BaseSerializer { def includeManifest: Boolean = false diff --git a/akka-remote/src/main/scala/akka/remote/serialization/ProtobufSerializer.scala b/akka-remote/src/main/scala/akka/remote/serialization/ProtobufSerializer.scala index 019bcf61fd..0b825cb91e 100644 --- a/akka-remote/src/main/scala/akka/remote/serialization/ProtobufSerializer.scala +++ b/akka-remote/src/main/scala/akka/remote/serialization/ProtobufSerializer.scala @@ -9,7 +9,7 @@ import java.util.concurrent.atomic.AtomicReference import akka.actor.{ ActorRef, ExtendedActorSystem } import akka.remote.WireFormats.ActorRefData -import akka.serialization.{ Serialization, Serializer } +import akka.serialization.{ Serialization, BaseSerializer } import com.google.protobuf.Message import scala.annotation.tailrec @@ -37,12 +37,10 @@ object ProtobufSerializer { /** * This Serializer serializes `com.google.protobuf.Message`s */ -class ProtobufSerializer extends Serializer { +class ProtobufSerializer(val system: ExtendedActorSystem) extends BaseSerializer { private val parsingMethodBindingRef = new AtomicReference[Map[Class[_], Method]](Map.empty) - override def identifier: Int = 2 - override def includeManifest: Boolean = true override def fromBinary(bytes: Array[Byte], manifest: Option[Class[_]]): AnyRef = {