=rem #16428 optimize protobuf serializer
This commit is contained in:
parent
c56d670c03
commit
a19e02b9e5
1 changed files with 39 additions and 13 deletions
|
|
@ -4,12 +4,18 @@
|
||||||
|
|
||||||
package akka.remote.serialization
|
package akka.remote.serialization
|
||||||
|
|
||||||
import akka.actor.{ ExtendedActorSystem, ActorRef }
|
import java.lang.reflect.Method
|
||||||
|
import java.util.concurrent.atomic.AtomicReference
|
||||||
|
|
||||||
|
import akka.actor.{ ActorRef, ExtendedActorSystem }
|
||||||
import akka.remote.WireFormats.ActorRefData
|
import akka.remote.WireFormats.ActorRefData
|
||||||
import akka.serialization.{ Serializer, Serialization }
|
import akka.serialization.{ Serialization, Serializer }
|
||||||
import com.google.protobuf.Message
|
import com.google.protobuf.Message
|
||||||
|
|
||||||
|
import scala.annotation.tailrec
|
||||||
|
|
||||||
object ProtobufSerializer {
|
object ProtobufSerializer {
|
||||||
|
private val ARRAY_OF_BYTE_ARRAY = Array[Class[_]](classOf[Array[Byte]])
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Helper to serialize an [[akka.actor.ActorRef]] to Akka's
|
* Helper to serialize an [[akka.actor.ActorRef]] to Akka's
|
||||||
|
|
@ -32,18 +38,38 @@ object ProtobufSerializer {
|
||||||
* This Serializer serializes `com.google.protobuf.Message`s
|
* This Serializer serializes `com.google.protobuf.Message`s
|
||||||
*/
|
*/
|
||||||
class ProtobufSerializer extends Serializer {
|
class ProtobufSerializer extends Serializer {
|
||||||
val ARRAY_OF_BYTE_ARRAY = Array[Class[_]](classOf[Array[Byte]])
|
|
||||||
def includeManifest: Boolean = true
|
|
||||||
def identifier = 2
|
|
||||||
|
|
||||||
def toBinary(obj: AnyRef): Array[Byte] = obj match {
|
private val parsingMethodBindingRef = new AtomicReference[Map[Class[_], Method]](Map.empty)
|
||||||
case m: Message ⇒ m.toByteArray
|
|
||||||
case _ ⇒ throw new IllegalArgumentException("Can't serialize a non-protobuf message using protobuf [" + obj + "]")
|
override def identifier: Int = 2
|
||||||
|
|
||||||
|
override def includeManifest: Boolean = true
|
||||||
|
|
||||||
|
override def fromBinary(bytes: Array[Byte], manifest: Option[Class[_]]): AnyRef = {
|
||||||
|
manifest match {
|
||||||
|
case Some(clazz) ⇒
|
||||||
|
@tailrec
|
||||||
|
def parsingMethod(method: Method = null): Method = {
|
||||||
|
val parsingMethodBinding = parsingMethodBindingRef.get()
|
||||||
|
parsingMethodBinding.get(clazz) match {
|
||||||
|
case Some(cachedParsingMethod) ⇒ cachedParsingMethod
|
||||||
|
case None ⇒
|
||||||
|
import ProtobufSerializer.ARRAY_OF_BYTE_ARRAY
|
||||||
|
val unCachedParsingMethod = if (method eq null) clazz.getDeclaredMethod("parseFrom", ARRAY_OF_BYTE_ARRAY: _*) else method
|
||||||
|
if (parsingMethodBindingRef.compareAndSet(parsingMethodBinding, parsingMethodBinding.updated(clazz, unCachedParsingMethod)))
|
||||||
|
unCachedParsingMethod
|
||||||
|
else
|
||||||
|
parsingMethod(unCachedParsingMethod)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
parsingMethod().invoke(null, bytes).asInstanceOf[Message]
|
||||||
|
|
||||||
|
case None ⇒ throw new IllegalArgumentException("Need a protobuf message class to be able to serialize bytes using protobuf")
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
def fromBinary(bytes: Array[Byte], clazz: Option[Class[_]]): AnyRef =
|
override def toBinary(obj: AnyRef): Array[Byte] = obj match {
|
||||||
clazz match {
|
case message: Message ⇒ message.toByteArray
|
||||||
case None ⇒ throw new IllegalArgumentException("Need a protobuf message class to be able to serialize bytes using protobuf")
|
case _ ⇒ throw new IllegalArgumentException(s"Can't serialize a non-protobuf message using protobuf [$obj]")
|
||||||
case Some(c) ⇒ c.getDeclaredMethod("parseFrom", ARRAY_OF_BYTE_ARRAY: _*).invoke(null, bytes).asInstanceOf[Message]
|
}
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue