Merge pull request #16429 from hepin1989/master
=rem #16428 optimize protobuf serializer for master
This commit is contained in:
commit
f0be7f8a21
1 changed files with 39 additions and 13 deletions
|
|
@ -4,12 +4,18 @@
|
|||
|
||||
package akka.remote.serialization
|
||||
|
||||
import akka.actor.{ ExtendedActorSystem, ActorRef }
|
||||
import java.lang.reflect.Method
|
||||
import java.util.concurrent.atomic.AtomicReference
|
||||
|
||||
import akka.actor.{ ActorRef, ExtendedActorSystem }
|
||||
import akka.remote.WireFormats.ActorRefData
|
||||
import akka.serialization.{ Serializer, Serialization }
|
||||
import akka.serialization.{ Serialization, Serializer }
|
||||
import com.google.protobuf.Message
|
||||
|
||||
import scala.annotation.tailrec
|
||||
|
||||
object ProtobufSerializer {
|
||||
private val ARRAY_OF_BYTE_ARRAY = Array[Class[_]](classOf[Array[Byte]])
|
||||
|
||||
/**
|
||||
* Helper to serialize an [[akka.actor.ActorRef]] to Akka's
|
||||
|
|
@ -32,18 +38,38 @@ object ProtobufSerializer {
|
|||
* This Serializer serializes `com.google.protobuf.Message`s
|
||||
*/
|
||||
class ProtobufSerializer extends Serializer {
|
||||
val ARRAY_OF_BYTE_ARRAY = Array[Class[_]](classOf[Array[Byte]])
|
||||
def includeManifest: Boolean = true
|
||||
def identifier = 2
|
||||
|
||||
def toBinary(obj: AnyRef): Array[Byte] = obj match {
|
||||
case m: Message ⇒ m.toByteArray
|
||||
case _ ⇒ throw new IllegalArgumentException("Can't serialize a non-protobuf message using protobuf [" + obj + "]")
|
||||
private val parsingMethodBindingRef = new AtomicReference[Map[Class[_], Method]](Map.empty)
|
||||
|
||||
override def identifier: Int = 2
|
||||
|
||||
override def includeManifest: Boolean = true
|
||||
|
||||
override def fromBinary(bytes: Array[Byte], manifest: Option[Class[_]]): AnyRef = {
|
||||
manifest match {
|
||||
case Some(clazz) ⇒
|
||||
@tailrec
|
||||
def parsingMethod(method: Method = null): Method = {
|
||||
val parsingMethodBinding = parsingMethodBindingRef.get()
|
||||
parsingMethodBinding.get(clazz) match {
|
||||
case Some(cachedParsingMethod) ⇒ cachedParsingMethod
|
||||
case None ⇒
|
||||
import ProtobufSerializer.ARRAY_OF_BYTE_ARRAY
|
||||
val unCachedParsingMethod = if (method eq null) clazz.getDeclaredMethod("parseFrom", ARRAY_OF_BYTE_ARRAY: _*) else method
|
||||
if (parsingMethodBindingRef.compareAndSet(parsingMethodBinding, parsingMethodBinding.updated(clazz, unCachedParsingMethod)))
|
||||
unCachedParsingMethod
|
||||
else
|
||||
parsingMethod(unCachedParsingMethod)
|
||||
}
|
||||
}
|
||||
parsingMethod().invoke(null, bytes).asInstanceOf[Message]
|
||||
|
||||
case None ⇒ throw new IllegalArgumentException("Need a protobuf message class to be able to serialize bytes using protobuf")
|
||||
}
|
||||
}
|
||||
|
||||
def fromBinary(bytes: Array[Byte], clazz: Option[Class[_]]): AnyRef =
|
||||
clazz match {
|
||||
case None ⇒ throw new IllegalArgumentException("Need a protobuf message class to be able to serialize bytes using protobuf")
|
||||
case Some(c) ⇒ c.getDeclaredMethod("parseFrom", ARRAY_OF_BYTE_ARRAY: _*).invoke(null, bytes).asInstanceOf[Message]
|
||||
}
|
||||
override def toBinary(obj: AnyRef): Array[Byte] = obj match {
|
||||
case message: Message ⇒ message.toByteArray
|
||||
case _ ⇒ throw new IllegalArgumentException(s"Can't serialize a non-protobuf message using protobuf [$obj]")
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue