=rem #13783 Make ProtobufSerializer independent of protobuf version

* well, as long as they provide the parseFrom and toByteArray
* it is using reflection to find the `parseFrom` and `toByteArray` methods to avoid
  dependency to `com.google.protobuf`.
* also special case com.google.protobuf when loading serialization binding
* migration guide
* mima filters for the serializers (all types changed)
* add real test for ProtobufSerializer
This commit is contained in:
Patrik Nordwall 2015-08-31 12:38:07 +02:00
parent c3ecb87a65
commit f8c1671903
7 changed files with 87 additions and 12 deletions

View file

@ -205,7 +205,15 @@ class Serialization(val system: ExtendedActorSystem) extends Extension {
* It is primarily ordered by the most specific classes first, and secondly in the configured order.
*/
private[akka] val bindings: immutable.Seq[ClassSerializer] =
sort(for ((k: String, v: String) settings.SerializationBindings if v != "none") yield (system.dynamicAccess.getClassFor[Any](k).get, serializers(v))).to[immutable.Seq]
sort(for ((k: String, v: String) settings.SerializationBindings if v != "none" && checkGoogleProtobuf(k))
yield (system.dynamicAccess.getClassFor[Any](k).get, serializers(v))).to[immutable.Seq]
// com.google.protobuf serialization binding is only used if the class can be loaded,
// i.e. com.google.protobuf dependency has been added in the application project.
// The reason for this special case is for backwards compatibility so that we still can
// include "com.google.protobuf.GeneratedMessage" = proto in configured serialization-bindings.
private def checkGoogleProtobuf(className: String): Boolean =
(!className.startsWith("com.google.protobuf") || system.dynamicAccess.getClassFor[Any](className).isSuccess)
/**
* Sort so that subtypes always precede their supertypes, but without

View file

@ -70,6 +70,18 @@ The following, previously deprecated, features have been removed:
* Java API TestKit.dilated, moved to JavaTestKit.dilated
Protobuf Dependency
===================
The transitive dependency to Protobuf has been removed to make it possible to use any version
of Protobuf for the application messages. If you use Protobuf in your application you need
to add the following dependency with desired version number::
"com.google.protobuf" % "protobuf-java" % "2.5.0"
Internally Akka is using an embedded version of protobuf that corresponds to ``com.google.protobuf/protobuf-java``
version 2.5.0. The package name of the embedded classes has been changed to ``akka.protobuf``.
Added parameter validation to RootActorPath
===========================================
Previously ``akka.actor.RootActorPath`` allowed passing in arbitrary strings into its name parameter,

View file

@ -19,12 +19,21 @@ akka {
}
serialization-bindings {
"akka.actor.ActorSelectionMessage" = akka-containers
"akka.remote.DaemonMsgCreate" = daemon-create
# Since akka.protobuf.Message does not extend Serializable but
# GeneratedMessage does, need to use the more specific one here in order
# to avoid ambiguity
"akka.actor.ActorSelectionMessage" = akka-containers
# to avoid ambiguity.
"akka.protobuf.GeneratedMessage" = proto
"akka.remote.DaemonMsgCreate" = daemon-create
# Since com.google.protobuf.Message does not extend Serializable but
# GeneratedMessage does, need to use the more specific one here in order
# to avoid ambiguity.
# This com.google.protobuf serialization binding is only used if the class can be loaded,
# i.e. com.google.protobuf dependency has been added in the application project.
"com.google.protobuf.GeneratedMessage" = proto
}
serialization-identifiers {

View file

@ -10,7 +10,6 @@ import java.util.concurrent.atomic.AtomicReference
import akka.actor.{ ActorRef, ExtendedActorSystem }
import akka.remote.WireFormats.ActorRefData
import akka.serialization.{ Serialization, BaseSerializer }
import akka.protobuf.Message
import scala.annotation.tailrec
@ -35,7 +34,9 @@ object ProtobufSerializer {
}
/**
* This Serializer serializes `akka.protobuf.Message`s
* This Serializer serializes `akka.protobuf.Message` and `com.google.protobuf.Message`
* It is using reflection to find the `parseFrom` and `toByteArray` methods to avoid
* dependency to `com.google.protobuf`.
*/
class ProtobufSerializer(val system: ExtendedActorSystem) extends BaseSerializer {
@ -51,6 +52,7 @@ class ProtobufSerializer(val system: ExtendedActorSystem) extends BaseSerializer
val ARRAY_OF_BYTE_ARRAY = Array[Class[_]](classOf[Array[Byte]])
private val parsingMethodBindingRef = new AtomicReference[Map[Class[_], Method]](Map.empty)
private val toByteArrayMethodBindingRef = new AtomicReference[Map[Class[_], Method]](Map.empty)
override def includeManifest: Boolean = true
@ -72,14 +74,29 @@ class ProtobufSerializer(val system: ExtendedActorSystem) extends BaseSerializer
parsingMethod(unCachedParsingMethod)
}
}
parsingMethod().invoke(null, bytes).asInstanceOf[Message]
parsingMethod().invoke(null, bytes)
case None throw new IllegalArgumentException("Need a protobuf message class to be able to serialize bytes using protobuf")
}
}
override def toBinary(obj: AnyRef): Array[Byte] = obj match {
case message: Message message.toByteArray
case _ throw new IllegalArgumentException(s"Can't serialize a non-protobuf message using protobuf [$obj]")
override def toBinary(obj: AnyRef): Array[Byte] = {
val clazz = obj.getClass
@tailrec
def toByteArrayMethod(method: Method = null): Method = {
val toByteArrayMethodBinding = toByteArrayMethodBindingRef.get()
toByteArrayMethodBinding.get(clazz) match {
case Some(cachedtoByteArrayMethod) cachedtoByteArrayMethod
case None
val unCachedtoByteArrayMethod =
if (method eq null) clazz.getMethod("toByteArray")
else method
if (toByteArrayMethodBindingRef.compareAndSet(toByteArrayMethodBinding, toByteArrayMethodBinding.updated(clazz, unCachedtoByteArrayMethod)))
unCachedtoByteArrayMethod
else
toByteArrayMethod(unCachedtoByteArrayMethod)
}
}
toByteArrayMethod().invoke(obj).asInstanceOf[Array[Byte]]
}
}

View file

@ -8,6 +8,8 @@ import akka.serialization.SerializationExtension
import akka.testkit.AkkaSpec
import akka.remote.WireFormats.SerializedMessage
import akka.remote.ProtobufProtocol.MyMessage
import akka.remote.MessageSerializer
import akka.actor.ExtendedActorSystem
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class ProtobufSerializerSpec extends AkkaSpec {
@ -21,6 +23,17 @@ class ProtobufSerializerSpec extends AkkaSpec {
ser.serializerFor(classOf[MyMessage]).getClass should ===(classOf[ProtobufSerializer])
}
"work for SerializedMessage (just an akka.protobuf message)" in {
// create a protobuf message
val protobufMessage = MessageSerializer.serialize(system.asInstanceOf[ExtendedActorSystem], "hello")
// serialize it with ProtobufSerializer
val bytes = ser.serialize(protobufMessage).get
// deserialize the bytes with ProtobufSerializer
val deserialized = ser.deserialize(bytes, protobufMessage.getClass).get.asInstanceOf[SerializedMessage]
deserialized.getSerializerId should ===(protobufMessage.getSerializerId)
deserialized.getMessage should ===(protobufMessage.getMessage) // same "hello"
}
}
}

View file

@ -87,7 +87,7 @@ object AkkaBuild extends Build {
lazy val actorTests = Project(
id = "akka-actor-tests",
base = file("akka-actor-tests"),
dependencies = Seq(testkit % "compile;test->test", protobuf)
dependencies = Seq(testkit % "compile;test->test")
)
lazy val benchJmh = Project(

View file

@ -23,6 +23,14 @@ object MiMa extends AutoPlugin {
case m: MemberProblem => m.ref.owner.fullName != name && m.ref.owner.fullName != (name + '$')
}
}
case class FilterAnyProblemStartingWith(start: String) extends com.typesafe.tools.mima.core.ProblemFilter {
import com.typesafe.tools.mima.core._
override def apply(p: Problem): Boolean = p match {
case t: TemplateProblem => !t.ref.fullName.startsWith(start)
case m: MemberProblem => !m.ref.owner.fullName.startsWith(start)
}
}
val mimaIgnoredProblems = {
import com.typesafe.tools.mima.core._
@ -550,7 +558,15 @@ object MiMa extends AutoPlugin {
// internal changes introduced by #17253
ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.cluster.ClusterDaemon.coreSupervisor"),
ProblemFilters.exclude[MissingMethodProblem]("akka.cluster.ClusterCoreSupervisor.publisher"),
ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.cluster.ClusterCoreSupervisor.coreDaemon")
ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.cluster.ClusterCoreSupervisor.coreDaemon"),
// protofbuf embedding #13783
FilterAnyProblemStartingWith("akka.remote.WireFormats"),
FilterAnyProblemStartingWith("akka.remote.ContainerFormats"),
FilterAnyProblemStartingWith("akka.remote.serialization.DaemonMsgCreateSerializer"),
FilterAnyProblemStartingWith("akka.remote.testconductor.TestConductorProtocol"),
FilterAnyProblemStartingWith("akka.cluster.protobuf.msg.ClusterMessages"),
FilterAnyProblemStartingWith("akka.cluster.protobuf.ClusterMessageSerializer")
)
}